diff --git a/build/dlv.Dockerfile b/build/dlv.Dockerfile index 4a2bdc33..1d865490 100644 --- a/build/dlv.Dockerfile +++ b/build/dlv.Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.19 as delve +FROM golang:1.20 as delve RUN curl --location --output delve-1.20.1.tar.gz https://github.com/go-delve/delve/archive/v1.20.1.tar.gz \ && tar xzf delve-1.20.1.tar.gz RUN cd delve-1.20.1 && CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o /go/dlv -ldflags '-extldflags "-static"' ./cmd/dlv/ diff --git a/build/test.Dockerfile b/build/test.Dockerfile index 3f085bbd..bcc815ff 100644 --- a/build/test.Dockerfile +++ b/build/test.Dockerfile @@ -2,6 +2,4 @@ FROM naison/kubevpn:latest WORKDIR /app -RUN apt-get clean && apt-get update && apt-get install -y iperf3 - COPY bin/kubevpn /usr/local/bin/kubevpn \ No newline at end of file diff --git a/pkg/core/tunhandler.go b/pkg/core/tunhandler.go index 83eba598..00164431 100644 --- a/pkg/core/tunhandler.go +++ b/pkg/core/tunhandler.go @@ -14,7 +14,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/wencaiwulue/kubevpn/pkg/config" - pkgtun "github.com/wencaiwulue/kubevpn/pkg/tun" + "github.com/wencaiwulue/kubevpn/pkg/tun" "github.com/wencaiwulue/kubevpn/pkg/util" ) @@ -148,6 +148,7 @@ func (h tunHandler) printRoute() { select { case <-time.Tick(time.Second * 5): var i int + var sb strings.Builder h.routeNAT.Range(func(key string, value []net.Addr) { i++ var s []string @@ -156,8 +157,11 @@ func (h tunHandler) printRoute() { s = append(s, addr.String()) } } - fmt.Printf("to: %s, route: %s\n", key, strings.Join(s, " ")) + if len(s) != 0 { + sb.WriteString(fmt.Sprintf("to: %s, route: %s\n", key, strings.Join(s, " "))) + } }) + fmt.Println(sb.String()) fmt.Println(i) } } @@ -171,6 +175,9 @@ type Device struct { tunInbound chan *DataElem tunOutbound chan *DataElem + // your main logic + tunInboundHandler func(tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem) + chExit chan error } @@ -232,7 +239,7 @@ func (d *Device) Close() { } func (d *Device) heartbeats() { - tunIface, err := pkgtun.GetInterface() + tunIface, err := tun.GetInterface() if err != nil { return } @@ -356,51 +363,54 @@ func genICMPPacketIPv6(src net.IP, dst net.IP) ([]byte, error) { return buf.Bytes(), nil } -func (d *Device) Start() { +func (d *Device) Start(ctx context.Context) { go d.readFromTun() for i := 0; i < d.thread; i++ { go d.parseIPHeader() } + go d.tunInboundHandler(d.tunInbound, d.tunOutbound) go d.writeToTun() go d.heartbeats() + + select { + case err := <-d.chExit: + log.Error(err) + return + case <-ctx.Done(): + return + } } -func (h *tunHandler) HandleServer(ctx context.Context, tunConn net.Conn) { +func (d *Device) SetTunInboundHandler(handler func(tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem)) { + d.tunInboundHandler = handler +} + +func (h *tunHandler) HandleServer(ctx context.Context, tun net.Conn) { go h.printRoute() - tun := &Device{ - tun: tunConn, + device := &Device{ + tun: tun, thread: MaxThread, tunInboundRaw: make(chan *DataElem, MaxSize), tunInbound: make(chan *DataElem, MaxSize), tunOutbound: make(chan *DataElem, MaxSize), chExit: h.chExit, } - defer tun.Close() - tun.Start() - - for { - select { - case <-h.chExit: - return - case <-ctx.Done(): - return - default: - } - func() { - cancel, cancelFunc := context.WithCancel(ctx) - defer cancelFunc() - var lc net.ListenConfig - packetConn, err := lc.ListenPacket(cancel, "udp", h.node.Addr) + device.SetTunInboundHandler(func(tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem) { + for { + packetConn, err := (&net.ListenConfig{}).ListenPacket(ctx, "udp", h.node.Addr) if err != nil { log.Debugf("[udp] can not listen %s, err: %v", h.node.Addr, err) return } - err = h.transportTun(cancel, tun, packetConn) + err = transportTun(ctx, tunInbound, tunOutbound, packetConn, h.routeNAT, h.routeConnNAT) if err != nil { - log.Debugf("[tun] %s: %v", tunConn.LocalAddr(), err) + log.Debugf("[tun] %s: %v", tun.LocalAddr(), err) } - }() - } + } + }) + + defer device.Close() + device.Start(ctx) } type DataElem struct { @@ -425,7 +435,9 @@ type Peer struct { connInbound chan *udpElem parsedConnInfo chan *udpElem - tun *Device + tunInbound <-chan *DataElem + tunOutbound chan<- *DataElem + routeNAT *NAT // map[srcIP]net.Conn // routeConnNAT sync.Map @@ -457,6 +469,30 @@ func (p *Peer) readFromConn() { } } +func (p *Peer) readFromTCPConn() { + for packet := range Chan { + u := &udpElem{ + data: packet.Data[:], + length: int(packet.DataLength), + } + b := packet.Data + if util.IsIPv4(packet.Data) { + // ipv4.ParseHeader + u.src = net.IPv4(b[12], b[13], b[14], b[15]) + u.dst = net.IPv4(b[16], b[17], b[18], b[19]) + } else if util.IsIPv6(packet.Data) { + // ipv6.ParseHeader + u.src = b[8:24] + u.dst = b[24:40] + } else { + log.Errorf("[tun] unknown packet") + continue + } + log.Debugf("[tcpserver] udp-tun %s >>> %s length: %d", u.src, u.dst, u.length) + p.parsedConnInfo <- u + } +} + func (p *Peer) parseHeader() { var firstIPv4, firstIPv6 = true, true for e := range p.connInbound { @@ -490,7 +526,7 @@ func (p *Peer) parseHeader() { } } -func (p *Peer) route() { +func (p *Peer) routePeer() { for e := range p.parsedConnInfo { if routeToAddr := p.routeNAT.RouteTo(e.dst); routeToAddr != nil { log.Debugf("[tun] find route: %s -> %s", e.dst, routeToAddr) @@ -509,7 +545,7 @@ func (p *Peer) route() { } config.LPool.Put(e.data[:]) } else { - p.tun.tunOutbound <- &DataElem{ + p.tunOutbound <- &DataElem{ data: e.data, length: e.length, src: e.src, @@ -519,86 +555,66 @@ func (p *Peer) route() { } } +func (p *Peer) routeTUN() { + for e := range p.tunInbound { + if addr := p.routeNAT.RouteTo(e.dst); addr != nil { + log.Debugf("[tun] find route: %s -> %s", e.dst, addr) + _, err := p.conn.WriteTo(e.data[:e.length], addr) + config.LPool.Put(e.data[:]) + if err != nil { + log.Debugf("[tun] can not route: %s -> %s", e.dst, addr) + p.sendErr(err) + return + } + } else if conn, ok := p.routeConnNAT.Load(e.dst.String()); ok { + dgram := newDatagramPacket(e.data[:e.length]) + err := dgram.Write(conn.(net.Conn)) + config.LPool.Put(e.data[:]) + if err != nil { + log.Debugf("[tcpserver] udp-tun %s <- %s : %s", conn.(net.Conn).RemoteAddr(), dgram.Addr(), err) + p.sendErr(err) + return + } + } else { + config.LPool.Put(e.data[:]) + log.Debug(fmt.Errorf("[tun] no route for %s -> %s", e.src, e.dst)) + } + } +} + func (p *Peer) Start() { go p.readFromConn() + go p.readFromTCPConn() for i := 0; i < p.thread; i++ { go p.parseHeader() } - go p.route() + go p.routePeer() + go p.routeTUN() } func (p *Peer) Close() { p.conn.Close() } -func (h *tunHandler) transportTun(ctx context.Context, tun *Device, conn net.PacketConn) error { - errChan := make(chan error, 2) +func transportTun(ctx context.Context, tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem, packetConn net.PacketConn, nat *NAT, connNAT *sync.Map) error { p := &Peer{ - conn: conn, + conn: packetConn, thread: MaxThread, connInbound: make(chan *udpElem, MaxSize), parsedConnInfo: make(chan *udpElem, MaxSize), - tun: tun, - routeNAT: h.routeNAT, - routeConnNAT: h.routeConnNAT, - errChan: errChan, + tunInbound: tunInbound, + tunOutbound: tunOutbound, + routeNAT: nat, + routeConnNAT: connNAT, + errChan: make(chan error, 2), } defer p.Close() p.Start() - go func() { - for packet := range Chan { - u := &udpElem{ - data: packet.Data[:], - length: int(packet.DataLength), - } - b := packet.Data - if util.IsIPv4(packet.Data) { - // ipv4.ParseHeader - u.src = net.IPv4(b[12], b[13], b[14], b[15]) - u.dst = net.IPv4(b[16], b[17], b[18], b[19]) - } else if util.IsIPv6(packet.Data) { - // ipv6.ParseHeader - u.src = b[8:24] - u.dst = b[24:40] - } else { - log.Errorf("[tun] unknown packet") - continue - } - log.Debugf("[tcpserver] udp-tun %s >>> %s length: %d", u.src, u.dst, u.length) - p.parsedConnInfo <- u - } - }() - go func() { - for e := range tun.tunInbound { - if addr := h.routeNAT.RouteTo(e.dst); addr != nil { - log.Debugf("[tun] find route: %s -> %s", e.dst, addr) - _, err := conn.WriteTo(e.data[:e.length], addr) - config.LPool.Put(e.data[:]) - if err != nil { - log.Debugf("[tun] can not route: %s -> %s", e.dst, addr) - p.sendErr(err) - return - } - } else if conn, ok := p.routeConnNAT.Load(e.dst.String()); ok { - dgram := newDatagramPacket(e.data[:e.length]) - err := dgram.Write(conn.(net.Conn)) - config.LPool.Put(e.data[:]) - if err != nil { - log.Debugf("[tcpserver] udp-tun %s <- %s : %s", conn.(net.Conn).RemoteAddr(), dgram.Addr(), err) - p.sendErr(err) - return - } - } else { - config.LPool.Put(e.data[:]) - log.Debug(fmt.Errorf("[tun] no route for %s -> %s", e.src, e.dst)) - } - } - }() - select { - case err := <-errChan: + case err := <-p.errChan: + log.Errorf(err.Error()) return err case <-ctx.Done(): return nil diff --git a/pkg/core/tunhandlercli.go b/pkg/core/tunhandlercli.go deleted file mode 100644 index 60075d46..00000000 --- a/pkg/core/tunhandlercli.go +++ /dev/null @@ -1,121 +0,0 @@ -package core - -import ( - "context" - "errors" - "net" - "time" - - log "github.com/sirupsen/logrus" - - "github.com/wencaiwulue/kubevpn/pkg/config" -) - -func (h *tunHandler) HandleClient(ctx context.Context, tun net.Conn) { - d := &Device{ - tun: tun, - thread: MaxThread, - tunInboundRaw: make(chan *DataElem, MaxSize), - tunInbound: make(chan *DataElem, MaxSize), - tunOutbound: make(chan *DataElem, MaxSize), - chExit: h.chExit, - } - defer d.Close() - d.Start() - - remoteAddr, err := net.ResolveUDPAddr("udp", h.node.Remote) - if err != nil { - log.Errorf("[tun] %s: remote addr: %v", tun.LocalAddr(), err) - return - } - - for i := 0; i < MaxConn; i++ { - go func() { - for { - func() { - cancel, cancelFunc := context.WithCancel(ctx) - defer cancelFunc() - var packetConn net.PacketConn - defer func() { - if packetConn != nil { - _ = packetConn.Close() - } - }() - if !h.chain.IsEmpty() { - cc, errs := h.chain.DialContext(cancel) - if errs != nil { - log.Debug(errs) - time.Sleep(time.Second * 5) - return - } - var ok bool - if packetConn, ok = cc.(net.PacketConn); !ok { - errs = errors.New("not a packet connection") - log.Errorf("[tun] %s - %s: %s", tun.LocalAddr(), remoteAddr, errs) - return - } - } else { - var errs error - var lc net.ListenConfig - packetConn, errs = lc.ListenPacket(cancel, "udp", "") - if errs != nil { - log.Error(errs) - return - } - } - errs := h.transportTunCli(cancel, d, packetConn, remoteAddr) - if errs != nil { - log.Debugf("[tun] %s: %v", tun.LocalAddr(), errs) - } - }() - } - }() - } - - select { - case s := <-h.chExit: - log.Error(s) - return - case <-ctx.Done(): - return - } -} - -func (h *tunHandler) transportTunCli(ctx context.Context, d *Device, conn net.PacketConn, remoteAddr net.Addr) error { - errChan := make(chan error, 2) - defer conn.Close() - - go func() { - for e := range d.tunInbound { - if e.src.Equal(e.dst) { - d.tunOutbound <- e - continue - } - _, err := conn.WriteTo(e.data[:e.length], remoteAddr) - config.LPool.Put(e.data[:]) - if err != nil { - errChan <- err - return - } - } - }() - - go func() { - for { - b := config.LPool.Get().([]byte) - n, _, err := conn.ReadFrom(b[:]) - if err != nil { - errChan <- err - return - } - d.tunOutbound <- &DataElem{data: b[:], length: n} - } - }() - - select { - case err := <-errChan: - return err - case <-ctx.Done(): - return nil - } -} diff --git a/pkg/core/tunhandlerclient.go b/pkg/core/tunhandlerclient.go new file mode 100644 index 00000000..c1e2fde7 --- /dev/null +++ b/pkg/core/tunhandlerclient.go @@ -0,0 +1,107 @@ +package core + +import ( + "context" + "errors" + "net" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/wencaiwulue/kubevpn/pkg/config" +) + +func (h *tunHandler) HandleClient(ctx context.Context, tun net.Conn) { + remoteAddr, err := net.ResolveUDPAddr("udp", h.node.Remote) + if err != nil { + log.Errorf("[tun] %s: remote addr: %v", tun.LocalAddr(), err) + return + } + + device := &Device{ + tun: tun, + thread: MaxThread, + tunInboundRaw: make(chan *DataElem, MaxSize), + tunInbound: make(chan *DataElem, MaxSize), + tunOutbound: make(chan *DataElem, MaxSize), + chExit: h.chExit, + } + device.SetTunInboundHandler(func(tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem) { + for { + packetConn, err := getRemotePacketConn(ctx, h.chain) + if err != nil { + log.Debugf("[tun] %s - %s: %s", tun.LocalAddr(), remoteAddr, err) + time.Sleep(time.Second * 2) + continue + } + err = transportTunClient(ctx, tunInbound, tunOutbound, packetConn, remoteAddr) + if err != nil { + log.Debugf("[tun] %s: %v", tun.LocalAddr(), err) + } + } + }) + + defer device.Close() + device.Start(ctx) +} + +func getRemotePacketConn(ctx context.Context, chain *Chain) (net.PacketConn, error) { + var packetConn net.PacketConn + if !chain.IsEmpty() { + cc, err := chain.DialContext(ctx) + if err != nil { + return nil, err + } + var ok bool + if packetConn, ok = cc.(net.PacketConn); !ok { + return nil, errors.New("not a packet connection") + } + } else { + var err error + var lc net.ListenConfig + packetConn, err = lc.ListenPacket(ctx, "udp", "") + if err != nil { + return nil, err + } + } + return packetConn, nil +} + +func transportTunClient(ctx context.Context, tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem, packetConn net.PacketConn, remoteAddr net.Addr) error { + errChan := make(chan error, 2) + defer packetConn.Close() + + go func() { + for e := range tunInbound { + if e.src.Equal(e.dst) { + tunOutbound <- e + continue + } + _, err := packetConn.WriteTo(e.data[:e.length], remoteAddr) + config.LPool.Put(e.data[:]) + if err != nil { + errChan <- err + return + } + } + }() + + go func() { + for { + b := config.LPool.Get().([]byte) + n, _, err := packetConn.ReadFrom(b[:]) + if err != nil { + errChan <- err + return + } + tunOutbound <- &DataElem{data: b[:], length: n} + } + }() + + select { + case err := <-errChan: + return err + case <-ctx.Done(): + return nil + } +} diff --git a/pkg/dev/dockerrun.go b/pkg/dev/dockerrun.go index e17287ef..7fbef562 100644 --- a/pkg/dev/dockerrun.go +++ b/pkg/dev/dockerrun.go @@ -31,7 +31,7 @@ func attachContainer(ctx context.Context, dockerCli command.Cli, errCh *chan err resp, errAttach := dockerCli.Client().ContainerAttach(ctx, containerID, options) if errAttach != nil { - return nil, errAttach + return nil, fmt.Errorf("failed to attach to container: %s, err: %v", containerID, errAttach) } var (