Merge pull request #567 from kubenetworks/refactor/refactor-code

refactor: refactor code
This commit is contained in:
naison
2025-04-29 22:16:59 +08:00
committed by GitHub
13 changed files with 312 additions and 116 deletions

View File

@@ -45,9 +45,9 @@ spec:
ip6tables -P FORWARD ACCEPT
iptables -t nat -A POSTROUTING -s ${CIDR4} -o eth0 -j MASQUERADE
ip6tables -t nat -A POSTROUTING -s ${CIDR6} -o eth0 -j MASQUERADE
kubevpn server -l "tcp://:10800" -l "tun://:8422?net=${TunIPv4}&net6=${TunIPv6}" -l "gtcp://:10801" -l "gudp://:10802" --debug=true
kubevpn server -l "tcp://:10800" -l "tun://:8422?net=${TunIPv4}&net6=${TunIPv6}" -l "gtcp://:10801"
{{- else }}
- kubevpn server -l "tcp://:10800" -l "gtcp://:10801" -l "gudp://:10802" --debug=true
- kubevpn server -l "tcp://:10800" -l "gtcp://:10801"
{{- end }}
command:
- /bin/sh

View File

@@ -42,7 +42,7 @@ func CmdDaemon(cmdutil.Factory) *cobra.Command {
} else {
go util.StartupPProf(config.PProfPort)
}
return initLogfile(action.GetDaemonLogPath())
return initLogfile(action.GetDaemonLogPath(opt.IsSudo))
},
RunE: func(cmd *cobra.Command, args []string) (err error) {
defer opt.Stop()

View File

@@ -45,24 +45,24 @@ const (
VolumeSyncthing = "syncthing"
// innerIPv4Pool is used as tun ip
// 198.19.0.0/16 network is part of the 198.18.0.0/15 (reserved for benchmarking).
// 198.19.0.0/16 network is part of the 198.18.0.0/15 (reserved for benchmarking).
// https://www.iana.org/assignments/iana-ipv4-special-registry/iana-ipv4-special-registry.xhtml
// so we split it into 2 parts: 198.18.0.0/15 --> [198.19.0.0/16, 198.19.0.0/16]
innerIPv4Pool = "198.19.0.100/16"
// 原因在docker环境中设置docker的 gateway 和 subnet不能 inner 的冲突,也不能和 docker的 172.17 冲突
// 不然的话,请求会不通的
// 解决的问题:在 k8s 中的 名叫 kubernetes 的 service ip 为
// ➜ ~ kubectl get service kubernetes
//NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
//kubernetes ClusterIP 172.17.0.1 <none> 443/TCP 190d
//
// ➜ ~ docker network inspect bridge | jq '.[0].IPAM.Config'
//[
// {
// "Subnet": "172.17.0.0/16",
// "Gateway": "172.17.0.1"
// }
//]
// 如果不创建 network那么是无法请求到 这个 kubernetes 的 service 的
/*
reasondocker use 172.17.0.0/16 network conflict with k8s service kubernetes
➜ ~ kubectl get service kubernetes
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kubernetes ClusterIP 172.17.0.1 <none> 443/TCP 190d
➜ ~ docker network inspect bridge | jq '.[0].IPAM.Config'
[
{
"Subnet": "172.17.0.0/16",
"Gateway": "172.17.0.1"
}
]
*/
dockerInnerIPv4Pool = "198.18.0.100/16"
// 2001:2::/64 network is part of the 2001:2::/48 (reserved for benchmarking)
@@ -99,9 +99,6 @@ const (
// hosts entry key word
HostsKeyWord = "# Add by KubeVPN"
GHCR_IMAGE_REGISTRY = "ghcr.io"
DOCKER_IMAGE_REGISTRY = "docker.io"
)
var (

View File

@@ -18,9 +18,12 @@ const (
PidPath = "daemon.pid"
SudoPidPath = "sudo_daemon.pid"
LogFile = "daemon.log"
UserLogFile = "user_daemon.log"
SudoLogFile = "root_daemon.log"
ConfigFile = "config.yaml"
TmpDir = "tmp"
)
//go:embed config.yaml
@@ -51,6 +54,14 @@ func init() {
if err != nil {
panic(err)
}
err = os.MkdirAll(GetTempPath(), 0755)
if err != nil {
panic(err)
}
err = os.Chmod(GetTempPath(), 0755)
if err != nil {
panic(err)
}
path := filepath.Join(HomePath, ConfigFile)
_, err = os.Stat(path)
@@ -85,3 +96,7 @@ func GetSyncthingPath() string {
func GetConfigFilePath() string {
return filepath.Join(HomePath, ConfigFile)
}
func GetTempPath() string {
return filepath.Join(HomePath, TmpDir)
}

View File

@@ -38,23 +38,24 @@ type ClientDevice struct {
tunInbound chan *Packet
tunOutbound chan *Packet
errChan chan error
remote *net.UDPAddr
forward *Forwarder
}
func (d *ClientDevice) handlePacket(ctx context.Context, forward *Forwarder) {
for ctx.Err() == nil {
conn, err := forwardConn(ctx, forward)
if err != nil {
plog.G(ctx).Errorf("Failed to get remote conn from %s -> %s: %s", d.tun.LocalAddr(), forward.node.Remote, err)
time.Sleep(time.Second * 1)
continue
}
err = handlePacketClient(ctx, d.tunInbound, d.tunOutbound, conn)
if err != nil {
plog.G(ctx).Errorf("Failed to transport data to remote %s: %v", conn.RemoteAddr(), err)
}
func() {
defer time.Sleep(time.Second * 2)
conn, err := forwardConn(ctx, forward)
if err != nil {
plog.G(ctx).Errorf("Failed to get remote conn from %s -> %s: %s", d.tun.LocalAddr(), forward.node.Remote, err)
return
}
defer conn.Close()
err = handlePacketClient(ctx, d.tunInbound, d.tunOutbound, conn)
if err != nil {
plog.G(ctx).Errorf("Failed to transport data to remote %s: %v", conn.RemoteAddr(), err)
return
}
}()
}
}
@@ -68,7 +69,6 @@ func forwardConn(ctx context.Context, forwarder *Forwarder) (net.Conn, error) {
func handlePacketClient(ctx context.Context, tunInbound <-chan *Packet, tunOutbound chan<- *Packet, conn net.Conn) error {
errChan := make(chan error, 2)
defer conn.Close()
go func() {
defer util.HandleCrash()
@@ -76,6 +76,7 @@ func handlePacketClient(ctx context.Context, tunInbound <-chan *Packet, tunOutbo
_, err := conn.Write(packet.data[:packet.length])
config.LPool.Put(packet.data[:])
if err != nil {
plog.G(ctx).Errorf("Failed to write packet to remote: %v", err)
util.SafeWrite(errChan, errors.Wrap(err, "failed to write packet to remote"))
return
}
@@ -89,6 +90,7 @@ func handlePacketClient(ctx context.Context, tunInbound <-chan *Packet, tunOutbo
n, err := conn.Read(buf[:])
if err != nil {
config.LPool.Put(buf[:])
plog.G(ctx).Errorf("Failed to read packet from remote: %v", err)
util.SafeWrite(errChan, errors.Wrap(err, fmt.Sprintf("failed to read packet from remote %s", conn.RemoteAddr())))
return
}

View File

@@ -1,7 +1,6 @@
package action
import (
"bufio"
"io"
"log"
"os"
@@ -12,35 +11,73 @@ import (
)
func (svr *Server) Logs(req *rpc.LogRequest, resp rpc.Daemon_LogsServer) error {
path := GetDaemonLogPath()
lines, err2 := countLines(path)
if err2 != nil {
return err2
}
// only show latest N lines
if req.Lines < 0 {
lines = -req.Lines
} else {
lines -= req.Lines
}
config := tail.Config{Follow: req.Follow, ReOpen: false, MustExist: true, Logger: log.New(io.Discard, "", log.LstdFlags)}
if !req.Follow {
// FATAL -- cannot set ReOpen without Follow.
config.ReOpen = false
}
file, err := tail.TailFile(path, config)
line := int64(max(req.Lines, -req.Lines))
sudoLine, sudoSize, err := seekToLastLine(GetDaemonLogPath(true), line)
if err != nil {
return err
}
defer file.Stop()
userLine, userSize, err := seekToLastLine(GetDaemonLogPath(false), line)
if err != nil {
return err
}
err = recent(resp, sudoLine, userLine)
if err != nil {
return err
}
if req.Follow {
err = tee(resp, sudoSize, userSize)
if err != nil {
return err
}
}
return nil
}
func tee(resp rpc.Daemon_LogsServer, sudoLine int64, userLine int64) error {
// FATAL -- cannot set ReOpen without Follow.
sudoConfig := tail.Config{
Follow: true,
ReOpen: true,
MustExist: true,
Logger: log.New(io.Discard, "", log.LstdFlags),
Location: &tail.SeekInfo{Offset: sudoLine, Whence: io.SeekStart},
}
userConfig := tail.Config{
Follow: true,
ReOpen: true,
MustExist: true,
Logger: log.New(io.Discard, "", log.LstdFlags),
Location: &tail.SeekInfo{Offset: userLine, Whence: io.SeekStart},
}
sudoFile, err := tail.TailFile(GetDaemonLogPath(true), sudoConfig)
if err != nil {
return err
}
defer sudoFile.Stop()
userFile, err := tail.TailFile(GetDaemonLogPath(false), userConfig)
if err != nil {
return err
}
defer userFile.Stop()
for {
select {
case <-resp.Context().Done():
return nil
case line, ok := <-file.Lines:
case line, ok := <-userFile.Lines:
if !ok {
return nil
}
if line.Err != nil {
return line.Err
}
err = resp.Send(&rpc.LogResponse{Message: "[USER] " + line.Text + "\n"})
if err != nil {
return err
}
case line, ok := <-sudoFile.Lines:
if !ok {
return nil
}
@@ -48,11 +85,7 @@ func (svr *Server) Logs(req *rpc.LogRequest, resp rpc.Daemon_LogsServer) error {
return err
}
if lines--; lines >= 0 {
continue
}
err = resp.Send(&rpc.LogResponse{Message: line.Text + "\n"})
err = resp.Send(&rpc.LogResponse{Message: "[ROOT] " + line.Text + "\n"})
if err != nil {
return err
}
@@ -60,23 +93,115 @@ func (svr *Server) Logs(req *rpc.LogRequest, resp rpc.Daemon_LogsServer) error {
}
}
func countLines(filename string) (int32, error) {
func recent(resp rpc.Daemon_LogsServer, sudoLine int64, userLine int64) error {
sudoConfig := tail.Config{
Follow: false,
ReOpen: false,
MustExist: true,
Logger: log.New(io.Discard, "", log.LstdFlags),
Location: &tail.SeekInfo{Offset: sudoLine, Whence: io.SeekStart},
}
userConfig := tail.Config{
Follow: false,
ReOpen: false,
MustExist: true,
Logger: log.New(io.Discard, "", log.LstdFlags),
Location: &tail.SeekInfo{Offset: userLine, Whence: io.SeekStart},
}
sudoFile, err := tail.TailFile(GetDaemonLogPath(true), sudoConfig)
if err != nil {
return err
}
defer sudoFile.Stop()
userFile, err := tail.TailFile(GetDaemonLogPath(false), userConfig)
if err != nil {
return err
}
defer userFile.Stop()
userOut:
for {
select {
case <-resp.Context().Done():
return nil
case line, ok := <-userFile.Lines:
if !ok {
break userOut
}
if line.Err != nil {
return line.Err
}
err = resp.Send(&rpc.LogResponse{Message: "[USER] " + line.Text + "\n"})
if err != nil {
return err
}
}
}
sudoOut:
for {
select {
case <-resp.Context().Done():
return nil
case line, ok := <-sudoFile.Lines:
if !ok {
break sudoOut
}
if line.Err != nil {
return line.Err
}
err = resp.Send(&rpc.LogResponse{Message: "[ROOT] " + line.Text + "\n"})
if err != nil {
return err
}
}
}
return nil
}
func seekToLastLine(filename string, lines int64) (int64, int64, error) {
file, err := os.Open(filename)
if err != nil {
return 0, err
return 0, 0, err
}
defer file.Close()
scanner := bufio.NewScanner(file)
lineCount := int32(0)
for scanner.Scan() {
lineCount++
stat, err := file.Stat()
if err != nil {
return 0, 0, err
}
size := stat.Size()
bufSize := int64(4096)
lineCount := int64(0)
remaining := size
if err = scanner.Err(); err != nil {
return 0, err
for remaining > 0 {
chunkSize := bufSize
if remaining < bufSize {
chunkSize = remaining
}
pos := remaining - chunkSize
_, err = file.Seek(pos, io.SeekStart)
if err != nil {
return 0, 0, err
}
buf := make([]byte, chunkSize)
_, err = file.Read(buf)
if err != nil {
return 0, 0, err
}
for i := len(buf) - 1; i >= 0; i-- {
if buf[i] == '\n' {
lineCount++
if lineCount > lines {
targetPos := pos + int64(i) + 1
return targetPos, size, nil
}
}
}
remaining -= chunkSize
}
return lineCount, nil
return 0, 0, nil
}

View File

@@ -34,6 +34,9 @@ type Server struct {
ID string
}
func GetDaemonLogPath() string {
return filepath.Join(config.DaemonPath, config.LogFile)
func GetDaemonLogPath(isSudo bool) string {
if isSudo {
return filepath.Join(config.DaemonPath, config.SudoLogFile)
}
return filepath.Join(config.DaemonPath, config.UserLogFile)
}

View File

@@ -44,7 +44,7 @@ type SvrOption struct {
func (o *SvrOption) Start(ctx context.Context) error {
l := &lumberjack.Logger{
Filename: action.GetDaemonLogPath(),
Filename: action.GetDaemonLogPath(o.IsSudo),
MaxSize: 100,
MaxAge: 3,
MaxBackups: 3,
@@ -63,7 +63,7 @@ func (o *SvrOption) Start(ctx context.Context) error {
plog.L.SetOutput(l)
rest.SetDefaultWarningHandler(rest.NoWarnings{})
// every day 00:00:00 rotate log
go rotateLog(l, o.IsSudo)
go rotateLog(l)
sockPath := config.GetSockPath(o.IsSudo)
err := os.Remove(sockPath)
@@ -227,11 +227,8 @@ func writePIDToFile(isSudo bool) error {
// let daemon process to Rotate log. create new log file
// sudo daemon process then use new log file
func rotateLog(l *lumberjack.Logger, isSudo bool) {
func rotateLog(l *lumberjack.Logger) {
sec := time.Duration(0)
if isSudo {
sec = 2 * time.Second
}
for {
nowTime := time.Now()
nowTimeStr := nowTime.Format("2006-01-02")
@@ -239,10 +236,6 @@ func rotateLog(l *lumberjack.Logger, isSudo bool) {
next := t2.AddDate(0, 0, 1).Add(sec)
after := next.UnixNano() - nowTime.UnixNano()
<-time.After(time.Duration(after) * time.Nanosecond)
if isSudo {
_ = l.Close()
} else {
_ = l.Rotate()
}
_ = l.Rotate()
}
}

View File

@@ -237,24 +237,15 @@ func (c *ConnectOptions) DoConnect(ctx context.Context, isLite bool, stopChan <-
plog.G(ctx).Errorf("Add extra node IP failed: %v", err)
return
}
var rawTCPForwardPort, gvisorTCPForwardPort, gvisorUDPForwardPort int
rawTCPForwardPort, err = util.GetAvailableTCPPortOrDie()
if err != nil {
return err
}
gvisorTCPForwardPort, err = util.GetAvailableTCPPortOrDie()
if err != nil {
return err
}
gvisorUDPForwardPort, err = util.GetAvailableTCPPortOrDie()
var tcpForwardPort int
tcpForwardPort, err = util.GetAvailableTCPPortOrDie()
if err != nil {
return err
}
plog.G(ctx).Info("Forwarding port...")
portPair := []string{
fmt.Sprintf("%d:10800", rawTCPForwardPort),
fmt.Sprintf("%d:10801", gvisorTCPForwardPort),
fmt.Sprintf("%d:10802", gvisorUDPForwardPort),
portPair := []string{fmt.Sprintf("%d:10800", tcpForwardPort)}
if c.Engine == config.EngineGvisor {
portPair = []string{fmt.Sprintf("%d:10801", tcpForwardPort)}
}
if err = c.portForward(c.ctx, portPair); err != nil {
return
@@ -262,10 +253,7 @@ func (c *ConnectOptions) DoConnect(ctx context.Context, isLite bool, stopChan <-
if util.IsWindows() {
driver.InstallWireGuardTunDriver()
}
forward := fmt.Sprintf("tcp://127.0.0.1:%d", rawTCPForwardPort)
if c.Engine == config.EngineGvisor {
forward = fmt.Sprintf("tcp://127.0.0.1:%d", gvisorTCPForwardPort)
}
forward := fmt.Sprintf("tcp://127.0.0.1:%d", tcpForwardPort)
if err = c.startLocalTunServer(c.ctx, forward, isLite); err != nil {
plog.G(ctx).Errorf("Start local tun service failed: %v", err)
return
@@ -312,14 +300,15 @@ func (c *ConnectOptions) portForward(ctx context.Context, portPair []string) err
}
pod := podList[0]
// add route in case of don't have permission to watch pod, but pod recreated ip changed, so maybe this ip can not visit
_ = c.addRoute(pod.Status.PodIP)
_ = c.addRoute(util.GetPodIP(pod)...)
childCtx, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()
var readyChan = make(chan struct{})
podName := pod.GetName()
// try to detect pod is delete event, if pod is deleted, needs to redo port-forward
go util.CheckPodStatus(childCtx, cancelFunc, podName, c.clientset.CoreV1().Pods(c.Namespace))
go util.CheckPortStatus(childCtx, cancelFunc, readyChan, strings.Split(portPair[1], ":")[0])
go util.CheckPortStatus(childCtx, cancelFunc, readyChan, strings.Split(portPair[0], ":")[0])
go c.heartbeats(childCtx, util.GetPodIP(pod)...)
if *first {
go func() {
select {
@@ -1224,3 +1213,30 @@ func (c *ConnectOptions) IsMe(ns, uid string, headers map[string]string) bool {
func (c *ConnectOptions) ProxyResources() ProxyList {
return c.proxyWorkloads
}
func (c *ConnectOptions) heartbeats(ctx context.Context, ips ...string) {
var dstIPv4, dstIPv6 net.IP
for _, podIP := range ips {
ip := net.ParseIP(podIP)
if ip == nil {
continue
}
if ip.To4() != nil {
dstIPv4 = ip
} else {
dstIPv6 = ip
}
}
ticker := time.NewTicker(config.KeepAliveTime)
defer ticker.Stop()
for ; ctx.Err() == nil; <-ticker.C {
if dstIPv4 != nil && c.localTunIPv4 != nil {
util.Ping(ctx, c.localTunIPv4.IP.String(), dstIPv4.String())
}
if dstIPv6 != nil && c.localTunIPv6 != nil {
util.Ping(ctx, c.localTunIPv6.IP.String(), dstIPv6.String())
}
}
}

View File

@@ -363,7 +363,7 @@ func genDeploySpec(namespace string, udp8422 string, tcp10800 string, tcp9002 st
Args: []string{util.If(
gvisor,
`
kubevpn server -l "tcp://:10800" -l "gtcp://:10801" -l "gudp://:10802" --debug=true`,
kubevpn server -l "tcp://:10800" -l "gtcp://:10801"`,
`
echo 1 > /proc/sys/net/ipv4/ip_forward
echo 0 > /proc/sys/net/ipv6/conf/all/disable_ipv6
@@ -375,7 +375,7 @@ iptables -P FORWARD ACCEPT
ip6tables -P FORWARD ACCEPT
iptables -t nat -A POSTROUTING -s ${CIDR4} -o eth0 -j MASQUERADE
ip6tables -t nat -A POSTROUTING -s ${CIDR6} -o eth0 -j MASQUERADE
kubevpn server -l "tcp://:10800" -l "tun://:8422?net=${TunIPv4}&net6=${TunIPv6}" -l "gtcp://:10801" -l "gudp://:10802" --debug=true`,
kubevpn server -l "tcp://:10800" -l "tun://:8422?net=${TunIPv4}&net6=${TunIPv6}" -l "gtcp://:10801"`,
)},
EnvFrom: []v1.EnvFromSource{{
SecretRef: &v1.SecretEnvSource{

View File

@@ -333,9 +333,9 @@ func SshJump(ctx context.Context, conf *SshConfig, flags *pflag.FlagSet, print b
return
}
if print {
plog.G(ctx).Infof("Use temporary kubeconfig: %s", path)
plog.G(ctx).Infof("Use temp kubeconfig: %s", path)
} else {
plog.G(ctx).Debugf("Use temporary kubeconfig: %s", path)
plog.G(ctx).Debugf("Use temp kubeconfig: %s", path)
}
return
}

View File

@@ -3,6 +3,7 @@ package util
import (
"context"
"encoding/json"
"fmt"
"net"
"net/url"
"os"
@@ -128,7 +129,12 @@ func GetAPIServerFromKubeConfigBytes(kubeconfigBytes []byte) *net.IPNet {
}
func ConvertToTempKubeconfigFile(kubeconfigBytes []byte) (string, error) {
temp, err := os.CreateTemp("", "*.kubeconfig")
pattern := "*.kubeconfig"
cluster, ns, _ := GetCluster(kubeconfigBytes)
if cluster != "" {
pattern = fmt.Sprintf("%s_%s_%s", cluster, ns, pattern)
}
temp, err := os.CreateTemp(config.GetTempPath(), pattern)
if err != nil {
return "", err
}
@@ -147,6 +153,32 @@ func ConvertToTempKubeconfigFile(kubeconfigBytes []byte) (string, error) {
return temp.Name(), nil
}
func GetCluster(kubeConfigBytes []byte) (cluster string, ns string, err error) {
var clientConfig clientcmd.ClientConfig
clientConfig, err = clientcmd.NewClientConfigFromBytes(kubeConfigBytes)
if err != nil {
return
}
var rawConfig api.Config
rawConfig, err = clientConfig.RawConfig()
if err != nil {
return
}
if err = api.FlattenConfig(&rawConfig); err != nil {
return
}
if rawConfig.Contexts == nil {
return
}
kubeContext := rawConfig.Contexts[rawConfig.CurrentContext]
if kubeContext == nil {
return
}
cluster = kubeContext.Cluster
ns = kubeContext.Namespace
return
}
func InitFactory(kubeconfigBytes string, ns string) cmdutil.Factory {
configFlags := genericclioptions.NewConfigFlags(true)
configFlags.WrapConfigFn = func(c *rest.Config) *rest.Config {

View File

@@ -382,7 +382,7 @@ func CheckPodStatus(ctx context.Context, cancelFunc context.CancelFunc, podName
}
}
func CheckPortStatus(ctx context.Context, cancelFunc context.CancelFunc, readyChan chan struct{}, localGvisorTCPPort string) {
func CheckPortStatus(ctx context.Context, cancelFunc context.CancelFunc, readyChan chan struct{}, localRandomTCPPort string) {
defer cancelFunc()
ticker := time.NewTicker(time.Second * 60)
defer ticker.Stop()
@@ -398,10 +398,10 @@ func CheckPortStatus(ctx context.Context, cancelFunc context.CancelFunc, readyCh
for ctx.Err() == nil {
var lc net.ListenConfig
conn, err := lc.Listen(ctx, "tcp", net.JoinHostPort("127.0.0.1", localGvisorTCPPort))
conn, err := lc.Listen(ctx, "tcp", net.JoinHostPort("127.0.0.1", localRandomTCPPort))
if err == nil {
_ = conn.Close()
plog.G(ctx).Debugf("Local port: %s is free", localGvisorTCPPort)
plog.G(ctx).Debugf("Local port: %s is free", localRandomTCPPort)
return
}
time.Sleep(time.Second * 1)
@@ -537,3 +537,16 @@ func DetectPodSupportIPv6(ctx context.Context, factory util.Factory, namespace s
}
return disableIPv6 == 0, nil
}
func GetPodIP(pod corev1.Pod) []string {
var result = sets.New[string]().Insert()
for _, p := range pod.Status.PodIPs {
if net.ParseIP(p.IP) != nil {
result.Insert(p.IP)
}
}
if net.ParseIP(pod.Status.PodIP) != nil {
result.Insert(pod.Status.PodIP)
}
return result.UnsortedList()
}