mirror of
https://github.com/kubenetworks/kubevpn.git
synced 2025-11-01 11:12:47 +08:00
feat: optimize code
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
FROM golang:1.19 as delve
|
||||
FROM golang:1.20 as delve
|
||||
RUN curl --location --output delve-1.20.1.tar.gz https://github.com/go-delve/delve/archive/v1.20.1.tar.gz \
|
||||
&& tar xzf delve-1.20.1.tar.gz
|
||||
RUN cd delve-1.20.1 && CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o /go/dlv -ldflags '-extldflags "-static"' ./cmd/dlv/
|
||||
|
||||
@@ -2,6 +2,4 @@ FROM naison/kubevpn:latest
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
RUN apt-get clean && apt-get update && apt-get install -y iperf3
|
||||
|
||||
COPY bin/kubevpn /usr/local/bin/kubevpn
|
||||
@@ -14,7 +14,7 @@ import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/wencaiwulue/kubevpn/pkg/config"
|
||||
pkgtun "github.com/wencaiwulue/kubevpn/pkg/tun"
|
||||
"github.com/wencaiwulue/kubevpn/pkg/tun"
|
||||
"github.com/wencaiwulue/kubevpn/pkg/util"
|
||||
)
|
||||
|
||||
@@ -148,6 +148,7 @@ func (h tunHandler) printRoute() {
|
||||
select {
|
||||
case <-time.Tick(time.Second * 5):
|
||||
var i int
|
||||
var sb strings.Builder
|
||||
h.routeNAT.Range(func(key string, value []net.Addr) {
|
||||
i++
|
||||
var s []string
|
||||
@@ -156,8 +157,11 @@ func (h tunHandler) printRoute() {
|
||||
s = append(s, addr.String())
|
||||
}
|
||||
}
|
||||
fmt.Printf("to: %s, route: %s\n", key, strings.Join(s, " "))
|
||||
if len(s) != 0 {
|
||||
sb.WriteString(fmt.Sprintf("to: %s, route: %s\n", key, strings.Join(s, " ")))
|
||||
}
|
||||
})
|
||||
fmt.Println(sb.String())
|
||||
fmt.Println(i)
|
||||
}
|
||||
}
|
||||
@@ -171,6 +175,9 @@ type Device struct {
|
||||
tunInbound chan *DataElem
|
||||
tunOutbound chan *DataElem
|
||||
|
||||
// your main logic
|
||||
tunInboundHandler func(tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem)
|
||||
|
||||
chExit chan error
|
||||
}
|
||||
|
||||
@@ -232,7 +239,7 @@ func (d *Device) Close() {
|
||||
}
|
||||
|
||||
func (d *Device) heartbeats() {
|
||||
tunIface, err := pkgtun.GetInterface()
|
||||
tunIface, err := tun.GetInterface()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@@ -356,51 +363,54 @@ func genICMPPacketIPv6(src net.IP, dst net.IP) ([]byte, error) {
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
func (d *Device) Start() {
|
||||
func (d *Device) Start(ctx context.Context) {
|
||||
go d.readFromTun()
|
||||
for i := 0; i < d.thread; i++ {
|
||||
go d.parseIPHeader()
|
||||
}
|
||||
go d.tunInboundHandler(d.tunInbound, d.tunOutbound)
|
||||
go d.writeToTun()
|
||||
go d.heartbeats()
|
||||
|
||||
select {
|
||||
case err := <-d.chExit:
|
||||
log.Error(err)
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (h *tunHandler) HandleServer(ctx context.Context, tunConn net.Conn) {
|
||||
func (d *Device) SetTunInboundHandler(handler func(tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem)) {
|
||||
d.tunInboundHandler = handler
|
||||
}
|
||||
|
||||
func (h *tunHandler) HandleServer(ctx context.Context, tun net.Conn) {
|
||||
go h.printRoute()
|
||||
tun := &Device{
|
||||
tun: tunConn,
|
||||
device := &Device{
|
||||
tun: tun,
|
||||
thread: MaxThread,
|
||||
tunInboundRaw: make(chan *DataElem, MaxSize),
|
||||
tunInbound: make(chan *DataElem, MaxSize),
|
||||
tunOutbound: make(chan *DataElem, MaxSize),
|
||||
chExit: h.chExit,
|
||||
}
|
||||
defer tun.Close()
|
||||
tun.Start()
|
||||
|
||||
device.SetTunInboundHandler(func(tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem) {
|
||||
for {
|
||||
select {
|
||||
case <-h.chExit:
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
func() {
|
||||
cancel, cancelFunc := context.WithCancel(ctx)
|
||||
defer cancelFunc()
|
||||
var lc net.ListenConfig
|
||||
packetConn, err := lc.ListenPacket(cancel, "udp", h.node.Addr)
|
||||
packetConn, err := (&net.ListenConfig{}).ListenPacket(ctx, "udp", h.node.Addr)
|
||||
if err != nil {
|
||||
log.Debugf("[udp] can not listen %s, err: %v", h.node.Addr, err)
|
||||
return
|
||||
}
|
||||
err = h.transportTun(cancel, tun, packetConn)
|
||||
err = transportTun(ctx, tunInbound, tunOutbound, packetConn, h.routeNAT, h.routeConnNAT)
|
||||
if err != nil {
|
||||
log.Debugf("[tun] %s: %v", tunConn.LocalAddr(), err)
|
||||
log.Debugf("[tun] %s: %v", tun.LocalAddr(), err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
})
|
||||
|
||||
defer device.Close()
|
||||
device.Start(ctx)
|
||||
}
|
||||
|
||||
type DataElem struct {
|
||||
@@ -425,7 +435,9 @@ type Peer struct {
|
||||
connInbound chan *udpElem
|
||||
parsedConnInfo chan *udpElem
|
||||
|
||||
tun *Device
|
||||
tunInbound <-chan *DataElem
|
||||
tunOutbound chan<- *DataElem
|
||||
|
||||
routeNAT *NAT
|
||||
// map[srcIP]net.Conn
|
||||
// routeConnNAT sync.Map
|
||||
@@ -457,6 +469,30 @@ func (p *Peer) readFromConn() {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Peer) readFromTCPConn() {
|
||||
for packet := range Chan {
|
||||
u := &udpElem{
|
||||
data: packet.Data[:],
|
||||
length: int(packet.DataLength),
|
||||
}
|
||||
b := packet.Data
|
||||
if util.IsIPv4(packet.Data) {
|
||||
// ipv4.ParseHeader
|
||||
u.src = net.IPv4(b[12], b[13], b[14], b[15])
|
||||
u.dst = net.IPv4(b[16], b[17], b[18], b[19])
|
||||
} else if util.IsIPv6(packet.Data) {
|
||||
// ipv6.ParseHeader
|
||||
u.src = b[8:24]
|
||||
u.dst = b[24:40]
|
||||
} else {
|
||||
log.Errorf("[tun] unknown packet")
|
||||
continue
|
||||
}
|
||||
log.Debugf("[tcpserver] udp-tun %s >>> %s length: %d", u.src, u.dst, u.length)
|
||||
p.parsedConnInfo <- u
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Peer) parseHeader() {
|
||||
var firstIPv4, firstIPv6 = true, true
|
||||
for e := range p.connInbound {
|
||||
@@ -490,7 +526,7 @@ func (p *Peer) parseHeader() {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Peer) route() {
|
||||
func (p *Peer) routePeer() {
|
||||
for e := range p.parsedConnInfo {
|
||||
if routeToAddr := p.routeNAT.RouteTo(e.dst); routeToAddr != nil {
|
||||
log.Debugf("[tun] find route: %s -> %s", e.dst, routeToAddr)
|
||||
@@ -509,7 +545,7 @@ func (p *Peer) route() {
|
||||
}
|
||||
config.LPool.Put(e.data[:])
|
||||
} else {
|
||||
p.tun.tunOutbound <- &DataElem{
|
||||
p.tunOutbound <- &DataElem{
|
||||
data: e.data,
|
||||
length: e.length,
|
||||
src: e.src,
|
||||
@@ -519,62 +555,11 @@ func (p *Peer) route() {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Peer) Start() {
|
||||
go p.readFromConn()
|
||||
for i := 0; i < p.thread; i++ {
|
||||
go p.parseHeader()
|
||||
}
|
||||
go p.route()
|
||||
}
|
||||
|
||||
func (p *Peer) Close() {
|
||||
p.conn.Close()
|
||||
}
|
||||
|
||||
func (h *tunHandler) transportTun(ctx context.Context, tun *Device, conn net.PacketConn) error {
|
||||
errChan := make(chan error, 2)
|
||||
p := &Peer{
|
||||
conn: conn,
|
||||
thread: MaxThread,
|
||||
connInbound: make(chan *udpElem, MaxSize),
|
||||
parsedConnInfo: make(chan *udpElem, MaxSize),
|
||||
tun: tun,
|
||||
routeNAT: h.routeNAT,
|
||||
routeConnNAT: h.routeConnNAT,
|
||||
errChan: errChan,
|
||||
}
|
||||
|
||||
defer p.Close()
|
||||
p.Start()
|
||||
|
||||
go func() {
|
||||
for packet := range Chan {
|
||||
u := &udpElem{
|
||||
data: packet.Data[:],
|
||||
length: int(packet.DataLength),
|
||||
}
|
||||
b := packet.Data
|
||||
if util.IsIPv4(packet.Data) {
|
||||
// ipv4.ParseHeader
|
||||
u.src = net.IPv4(b[12], b[13], b[14], b[15])
|
||||
u.dst = net.IPv4(b[16], b[17], b[18], b[19])
|
||||
} else if util.IsIPv6(packet.Data) {
|
||||
// ipv6.ParseHeader
|
||||
u.src = b[8:24]
|
||||
u.dst = b[24:40]
|
||||
} else {
|
||||
log.Errorf("[tun] unknown packet")
|
||||
continue
|
||||
}
|
||||
log.Debugf("[tcpserver] udp-tun %s >>> %s length: %d", u.src, u.dst, u.length)
|
||||
p.parsedConnInfo <- u
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
for e := range tun.tunInbound {
|
||||
if addr := h.routeNAT.RouteTo(e.dst); addr != nil {
|
||||
func (p *Peer) routeTUN() {
|
||||
for e := range p.tunInbound {
|
||||
if addr := p.routeNAT.RouteTo(e.dst); addr != nil {
|
||||
log.Debugf("[tun] find route: %s -> %s", e.dst, addr)
|
||||
_, err := conn.WriteTo(e.data[:e.length], addr)
|
||||
_, err := p.conn.WriteTo(e.data[:e.length], addr)
|
||||
config.LPool.Put(e.data[:])
|
||||
if err != nil {
|
||||
log.Debugf("[tun] can not route: %s -> %s", e.dst, addr)
|
||||
@@ -595,10 +580,41 @@ func (h *tunHandler) transportTun(ctx context.Context, tun *Device, conn net.Pac
|
||||
log.Debug(fmt.Errorf("[tun] no route for %s -> %s", e.src, e.dst))
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (p *Peer) Start() {
|
||||
go p.readFromConn()
|
||||
go p.readFromTCPConn()
|
||||
for i := 0; i < p.thread; i++ {
|
||||
go p.parseHeader()
|
||||
}
|
||||
go p.routePeer()
|
||||
go p.routeTUN()
|
||||
}
|
||||
|
||||
func (p *Peer) Close() {
|
||||
p.conn.Close()
|
||||
}
|
||||
|
||||
func transportTun(ctx context.Context, tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem, packetConn net.PacketConn, nat *NAT, connNAT *sync.Map) error {
|
||||
p := &Peer{
|
||||
conn: packetConn,
|
||||
thread: MaxThread,
|
||||
connInbound: make(chan *udpElem, MaxSize),
|
||||
parsedConnInfo: make(chan *udpElem, MaxSize),
|
||||
tunInbound: tunInbound,
|
||||
tunOutbound: tunOutbound,
|
||||
routeNAT: nat,
|
||||
routeConnNAT: connNAT,
|
||||
errChan: make(chan error, 2),
|
||||
}
|
||||
|
||||
defer p.Close()
|
||||
p.Start()
|
||||
|
||||
select {
|
||||
case err := <-errChan:
|
||||
case err := <-p.errChan:
|
||||
log.Errorf(err.Error())
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
|
||||
@@ -1,121 +0,0 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/wencaiwulue/kubevpn/pkg/config"
|
||||
)
|
||||
|
||||
func (h *tunHandler) HandleClient(ctx context.Context, tun net.Conn) {
|
||||
d := &Device{
|
||||
tun: tun,
|
||||
thread: MaxThread,
|
||||
tunInboundRaw: make(chan *DataElem, MaxSize),
|
||||
tunInbound: make(chan *DataElem, MaxSize),
|
||||
tunOutbound: make(chan *DataElem, MaxSize),
|
||||
chExit: h.chExit,
|
||||
}
|
||||
defer d.Close()
|
||||
d.Start()
|
||||
|
||||
remoteAddr, err := net.ResolveUDPAddr("udp", h.node.Remote)
|
||||
if err != nil {
|
||||
log.Errorf("[tun] %s: remote addr: %v", tun.LocalAddr(), err)
|
||||
return
|
||||
}
|
||||
|
||||
for i := 0; i < MaxConn; i++ {
|
||||
go func() {
|
||||
for {
|
||||
func() {
|
||||
cancel, cancelFunc := context.WithCancel(ctx)
|
||||
defer cancelFunc()
|
||||
var packetConn net.PacketConn
|
||||
defer func() {
|
||||
if packetConn != nil {
|
||||
_ = packetConn.Close()
|
||||
}
|
||||
}()
|
||||
if !h.chain.IsEmpty() {
|
||||
cc, errs := h.chain.DialContext(cancel)
|
||||
if errs != nil {
|
||||
log.Debug(errs)
|
||||
time.Sleep(time.Second * 5)
|
||||
return
|
||||
}
|
||||
var ok bool
|
||||
if packetConn, ok = cc.(net.PacketConn); !ok {
|
||||
errs = errors.New("not a packet connection")
|
||||
log.Errorf("[tun] %s - %s: %s", tun.LocalAddr(), remoteAddr, errs)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
var errs error
|
||||
var lc net.ListenConfig
|
||||
packetConn, errs = lc.ListenPacket(cancel, "udp", "")
|
||||
if errs != nil {
|
||||
log.Error(errs)
|
||||
return
|
||||
}
|
||||
}
|
||||
errs := h.transportTunCli(cancel, d, packetConn, remoteAddr)
|
||||
if errs != nil {
|
||||
log.Debugf("[tun] %s: %v", tun.LocalAddr(), errs)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
select {
|
||||
case s := <-h.chExit:
|
||||
log.Error(s)
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (h *tunHandler) transportTunCli(ctx context.Context, d *Device, conn net.PacketConn, remoteAddr net.Addr) error {
|
||||
errChan := make(chan error, 2)
|
||||
defer conn.Close()
|
||||
|
||||
go func() {
|
||||
for e := range d.tunInbound {
|
||||
if e.src.Equal(e.dst) {
|
||||
d.tunOutbound <- e
|
||||
continue
|
||||
}
|
||||
_, err := conn.WriteTo(e.data[:e.length], remoteAddr)
|
||||
config.LPool.Put(e.data[:])
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
for {
|
||||
b := config.LPool.Get().([]byte)
|
||||
n, _, err := conn.ReadFrom(b[:])
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
d.tunOutbound <- &DataElem{data: b[:], length: n}
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-errChan:
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
107
pkg/core/tunhandlerclient.go
Normal file
107
pkg/core/tunhandlerclient.go
Normal file
@@ -0,0 +1,107 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/wencaiwulue/kubevpn/pkg/config"
|
||||
)
|
||||
|
||||
func (h *tunHandler) HandleClient(ctx context.Context, tun net.Conn) {
|
||||
remoteAddr, err := net.ResolveUDPAddr("udp", h.node.Remote)
|
||||
if err != nil {
|
||||
log.Errorf("[tun] %s: remote addr: %v", tun.LocalAddr(), err)
|
||||
return
|
||||
}
|
||||
|
||||
device := &Device{
|
||||
tun: tun,
|
||||
thread: MaxThread,
|
||||
tunInboundRaw: make(chan *DataElem, MaxSize),
|
||||
tunInbound: make(chan *DataElem, MaxSize),
|
||||
tunOutbound: make(chan *DataElem, MaxSize),
|
||||
chExit: h.chExit,
|
||||
}
|
||||
device.SetTunInboundHandler(func(tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem) {
|
||||
for {
|
||||
packetConn, err := getRemotePacketConn(ctx, h.chain)
|
||||
if err != nil {
|
||||
log.Debugf("[tun] %s - %s: %s", tun.LocalAddr(), remoteAddr, err)
|
||||
time.Sleep(time.Second * 2)
|
||||
continue
|
||||
}
|
||||
err = transportTunClient(ctx, tunInbound, tunOutbound, packetConn, remoteAddr)
|
||||
if err != nil {
|
||||
log.Debugf("[tun] %s: %v", tun.LocalAddr(), err)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
defer device.Close()
|
||||
device.Start(ctx)
|
||||
}
|
||||
|
||||
func getRemotePacketConn(ctx context.Context, chain *Chain) (net.PacketConn, error) {
|
||||
var packetConn net.PacketConn
|
||||
if !chain.IsEmpty() {
|
||||
cc, err := chain.DialContext(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var ok bool
|
||||
if packetConn, ok = cc.(net.PacketConn); !ok {
|
||||
return nil, errors.New("not a packet connection")
|
||||
}
|
||||
} else {
|
||||
var err error
|
||||
var lc net.ListenConfig
|
||||
packetConn, err = lc.ListenPacket(ctx, "udp", "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return packetConn, nil
|
||||
}
|
||||
|
||||
func transportTunClient(ctx context.Context, tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem, packetConn net.PacketConn, remoteAddr net.Addr) error {
|
||||
errChan := make(chan error, 2)
|
||||
defer packetConn.Close()
|
||||
|
||||
go func() {
|
||||
for e := range tunInbound {
|
||||
if e.src.Equal(e.dst) {
|
||||
tunOutbound <- e
|
||||
continue
|
||||
}
|
||||
_, err := packetConn.WriteTo(e.data[:e.length], remoteAddr)
|
||||
config.LPool.Put(e.data[:])
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
for {
|
||||
b := config.LPool.Get().([]byte)
|
||||
n, _, err := packetConn.ReadFrom(b[:])
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
tunOutbound <- &DataElem{data: b[:], length: n}
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-errChan:
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@@ -31,7 +31,7 @@ func attachContainer(ctx context.Context, dockerCli command.Cli, errCh *chan err
|
||||
|
||||
resp, errAttach := dockerCli.Client().ContainerAttach(ctx, containerID, options)
|
||||
if errAttach != nil {
|
||||
return nil, errAttach
|
||||
return nil, fmt.Errorf("failed to attach to container: %s, err: %v", containerID, errAttach)
|
||||
}
|
||||
|
||||
var (
|
||||
|
||||
Reference in New Issue
Block a user