diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e6cf4471..c049c5ae 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -82,12 +82,6 @@ jobs: docker pull naison/kubevpn:test || true docker pull naison/kubevpn:latest || true docker pull naison/kubevpn:${tag} || true - docker pull naison/kubevpn-mesh:test || true - docker pull naison/kubevpn-mesh:latest || true - docker pull naison/kubevpn-mesh:${tag} || true - docker pull naison/envoy-xds-server:test || true - docker pull naison/envoy-xds-server:latest || true - docker pull naison/envoy-xds-server:${tag} || true - name: Install minikube run: | diff --git a/Makefile b/Makefile index 98c48925..a818c882 100644 --- a/Makefile +++ b/Makefile @@ -91,7 +91,6 @@ container: ############################ build local .PHONY: container-local container-local: kubevpn-linux-amd64 - CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ./bin/envoy-xds-server ./cmd/mesh docker build --platform linux/amd64 -t ${IMAGE} -f $(BUILD_DIR)/local.Dockerfile . docker push ${IMAGE} docker tag ${IMAGE} ${IMAGE_DEFAULT} diff --git a/build/Dockerfile b/build/Dockerfile index fe50b4cd..44d99ced 100644 --- a/build/Dockerfile +++ b/build/Dockerfile @@ -9,8 +9,6 @@ WORKDIR /go/src/$BASE RUN go env -w GO111MODULE=on && go env -w GOPROXY=https://goproxy.cn,direct RUN make kubevpn-linux-amd64 -RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o bin/envoy-xds-server /go/src/$BASE/cmd/mesh - FROM ubuntu:latest @@ -22,5 +20,4 @@ RUN apt-get clean && apt-get update && apt-get install -y wget dnsutils vim curl WORKDIR /app COPY --from=builder /go/src/$BASE/bin/kubevpn-linux-amd64 /usr/local/bin/kubevpn -COPY --from=builder /go/src/$BASE/bin/envoy-xds-server /bin/envoy-xds-server COPY --from=envoy /usr/local/bin/envoy /usr/local/bin/envoy \ No newline at end of file diff --git a/build/local.Dockerfile b/build/local.Dockerfile index 758d6529..d8c69d2d 100644 --- a/build/local.Dockerfile +++ b/build/local.Dockerfile @@ -9,5 +9,4 @@ RUN apt-get clean && apt-get update && apt-get install -y wget dnsutils vim curl WORKDIR /app COPY bin/kubevpn-linux-amd64 /usr/local/bin/kubevpn -COPY bin/envoy-xds-server /bin/envoy-xds-server COPY --from=envoy /usr/local/bin/envoy /usr/local/bin/envoy \ No newline at end of file diff --git a/cmd/kubevpn/cmds/controlplane.go b/cmd/kubevpn/cmds/controlplane.go new file mode 100644 index 00000000..3e2ce059 --- /dev/null +++ b/cmd/kubevpn/cmds/controlplane.go @@ -0,0 +1,31 @@ +package cmds + +import ( + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + + "github.com/wencaiwulue/kubevpn/pkg/config" + "github.com/wencaiwulue/kubevpn/pkg/controlplane" + "github.com/wencaiwulue/kubevpn/pkg/util" +) + +var ( + watchDirectoryFilename string + port uint = 9002 +) + +func init() { + controlPlaneCmd.Flags().StringVarP(&watchDirectoryFilename, "watchDirectoryFilename", "w", "/etc/envoy/envoy-config.yaml", "full path to directory to watch for files") + controlPlaneCmd.Flags().BoolVar(&config.Debug, "debug", false, "true/false") + RootCmd.AddCommand(controlPlaneCmd) +} + +var controlPlaneCmd = &cobra.Command{ + Use: "control-plane", + Short: "Control-plane is a envoy xds server", + Long: `Control-plane is a envoy xds server, distribute envoy route configuration`, + Run: func(cmd *cobra.Command, args []string) { + util.InitLogger(config.Debug) + controlplane.Main(watchDirectoryFilename, port, log.StandardLogger()) + }, +} diff --git a/cmd/kubevpn/main.go b/cmd/kubevpn/main.go index a87df692..4bf48bc9 100644 --- a/cmd/kubevpn/main.go +++ b/cmd/kubevpn/main.go @@ -1,9 +1,9 @@ package main import ( - "github.com/wencaiwulue/kubevpn/cmd/kubevpn/cmds" - _ "net/http/pprof" + + "github.com/wencaiwulue/kubevpn/cmd/kubevpn/cmds" ) func main() { diff --git a/cmd/mesh/main.go b/cmd/mesh/main.go deleted file mode 100644 index 8a534349..00000000 --- a/cmd/mesh/main.go +++ /dev/null @@ -1,56 +0,0 @@ -package main - -import ( - "context" - "flag" - - "github.com/envoyproxy/go-control-plane/pkg/cache/v3" - serverv3 "github.com/envoyproxy/go-control-plane/pkg/server/v3" - log "github.com/sirupsen/logrus" - - "github.com/wencaiwulue/kubevpn/pkg/controlplane" - "github.com/wencaiwulue/kubevpn/pkg/util" -) - -var ( - logger *log.Logger - watchDirectoryFileName string - port uint = 9002 -) - -func init() { - logger = log.New() - log.SetLevel(log.DebugLevel) - log.SetReportCaller(true) - log.SetFormatter(&util.Format{}) - flag.StringVar(&watchDirectoryFileName, "watchDirectoryFileName", "/etc/envoy/envoy-config.yaml", "full path to directory to watch for files") - flag.Parse() -} - -func main() { - snapshotCache := cache.NewSnapshotCache(false, cache.IDHash{}, logger) - proc := controlplane.NewProcessor(snapshotCache, logger) - - go func() { - ctx := context.Background() - server := serverv3.NewServer(ctx, snapshotCache, nil) - controlplane.RunServer(ctx, server, port) - }() - - notifyCh := make(chan controlplane.NotifyMessage, 100) - - notifyCh <- controlplane.NotifyMessage{ - Operation: controlplane.Create, - FilePath: watchDirectoryFileName, - } - - go controlplane.Watch(watchDirectoryFileName, notifyCh) - - for { - select { - case msg := <-notifyCh: - log.Infof("path: %s, event: %v", msg.FilePath, msg.Operation) - proc.ProcessFile(msg) - } - } -} diff --git a/pkg/config/config.go b/pkg/config/config.go index 8d46075e..d9aeb893 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -13,8 +13,7 @@ const ( KeyDHCP = "DHCP" KeyEnvoy = "ENVOY_CONFIG" KeyClusterIPv4POOLS = "IPv4_POOLS" - // config map annotation - AnnoRefCount = "ref-count" + KeyRefCount = "REF_COUNT" // container name ContainerSidecarEnvoyProxy = "envoy-proxy" diff --git a/pkg/controlplane/main.go b/pkg/controlplane/main.go new file mode 100644 index 00000000..cfe06370 --- /dev/null +++ b/pkg/controlplane/main.go @@ -0,0 +1,48 @@ +package controlplane + +import ( + "context" + "fmt" + "github.com/fsnotify/fsnotify" + + "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + serverv3 "github.com/envoyproxy/go-control-plane/pkg/server/v3" + log "github.com/sirupsen/logrus" +) + +func Main(filename string, port uint, logger *log.Logger) { + snapshotCache := cache.NewSnapshotCache(false, cache.IDHash{}, logger) + proc := NewProcessor(snapshotCache, logger) + + go func() { + ctx := context.Background() + server := serverv3.NewServer(ctx, snapshotCache, nil) + RunServer(ctx, server, port) + }() + + notifyCh := make(chan NotifyMessage, 100) + + notifyCh <- NotifyMessage{ + Operation: Create, + FilePath: filename, + } + + watcher, err := fsnotify.NewWatcher() + if err != nil { + log.Fatal(fmt.Errorf("failed to create file watcher, err: %v", err)) + } + defer watcher.Close() + err = watcher.Add(filename) + if err != nil { + log.Fatal(fmt.Errorf("failed to add file: %s to wather, err: %v", filename, err)) + } + go Watch(watcher, filename, notifyCh) + + for { + select { + case msg := <-notifyCh: + log.Infof("path: %s, event: %v", msg.FilePath, msg.Operation) + proc.ProcessFile(msg) + } + } +} diff --git a/pkg/controlplane/processor.go b/pkg/controlplane/processor.go index 7d468313..b9add029 100644 --- a/pkg/controlplane/processor.go +++ b/pkg/controlplane/processor.go @@ -3,9 +3,9 @@ package controlplane import ( "context" "fmt" - "io/ioutil" "math" "math/rand" + "os" "strconv" "github.com/envoyproxy/go-control-plane/pkg/cache/types" @@ -78,7 +78,7 @@ func (p *Processor) ProcessFile(file NotifyMessage) { func ParseYaml(file string) ([]*Virtual, error) { var virtualList = make([]*Virtual, 0) - yamlFile, err := ioutil.ReadFile(file) + yamlFile, err := os.ReadFile(file) if err != nil { return nil, fmt.Errorf("Error reading YAML file: %s\n", err) } diff --git a/pkg/controlplane/watcher.go b/pkg/controlplane/watcher.go index e398d57c..2c004c40 100644 --- a/pkg/controlplane/watcher.go +++ b/pkg/controlplane/watcher.go @@ -20,17 +20,7 @@ type NotifyMessage struct { FilePath string } -func Watch(directory string, notifyCh chan<- NotifyMessage) { - watcher, err := fsnotify.NewWatcher() - if err != nil { - log.Fatal(err) - } - defer watcher.Close() - err = watcher.Add(directory) - if err != nil { - log.Fatal(err) - } - +func Watch(watcher *fsnotify.Watcher, filename string, notifyCh chan<- NotifyMessage) { for { select { case event, ok := <-watcher.Events: @@ -63,7 +53,7 @@ func Watch(directory string, notifyCh chan<- NotifyMessage) { case <-time.Tick(time.Second * 3): notifyCh <- NotifyMessage{ Operation: Modify, - FilePath: directory, + FilePath: filename, } } } diff --git a/pkg/handler/cleaner.go b/pkg/handler/cleaner.go index 066e4b08..b7b8791c 100644 --- a/pkg/handler/cleaner.go +++ b/pkg/handler/cleaner.go @@ -2,7 +2,6 @@ package handler import ( "context" - "encoding/json" "fmt" "os" "os/signal" @@ -10,6 +9,7 @@ import ( "syscall" log "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -43,7 +43,16 @@ func (c *ConnectOptions) addCleanUpResourceHandler(clientset *kubernetes.Clients } } _ = clientset.CoreV1().Pods(namespace).Delete(context.Background(), config.CniNetName, v1.DeleteOptions{GracePeriodSeconds: pointer.Int64(0)}) - cleanupIfRefCountIsZero(clientset, namespace, config.ConfigMapPodTrafficManager) + var count int + count, err = updateRefCount(clientset.CoreV1().ConfigMaps(namespace), config.ConfigMapPodTrafficManager, -1) + if err == nil { + // if ref-count is less than zero or equals to zero, means nobody is using this traffic pod, so clean it + if count <= 0 { + cleanup(clientset, namespace, config.ConfigMapPodTrafficManager) + } + } else { + log.Error(err) + } log.Info("clean up successful") os.Exit(0) }() @@ -57,67 +66,67 @@ func Cleanup(s os.Signal) { } // vendor/k8s.io/kubectl/pkg/polymorphichelpers/rollback.go:99 -func updateRefCount(configMapInterface v12.ConfigMapInterface, name string, increment int) { - err := retry.OnError( +func updateRefCount(configMapInterface v12.ConfigMapInterface, name string, increment int) (current int, err error) { + err = retry.OnError( retry.DefaultRetry, - func(err error) bool { return !k8serrors.IsNotFound(err) }, - func() error { - cm, err := configMapInterface.Get(context.Background(), name, v1.GetOptions{}) + func(err error) bool { + return !k8serrors.IsNotFound(err) + }, + func() (err error) { + var cm *corev1.ConfigMap + cm, err = configMapInterface.Get(context.Background(), name, v1.GetOptions{}) if err != nil { - log.Errorf("update ref-count failed, increment: %d, error: %v", increment, err) - return err + if k8serrors.IsNotFound(err) { + return err + } + err = fmt.Errorf("update ref-count failed, increment: %d, error: %v", increment, err) + return } - curCount := 0 - if ref := cm.GetAnnotations()[config.AnnoRefCount]; len(ref) > 0 { - curCount, err = strconv.Atoi(ref) + curCount, _ := strconv.Atoi(cm.Data[config.KeyRefCount]) + var newVal = curCount + increment + if newVal < 0 { + newVal = 0 } - p, _ := json.Marshal([]interface{}{ - map[string]interface{}{ - "op": "replace", - "path": fmt.Sprintf("/metadata/annotations/%s", config.AnnoRefCount), - "value": strconv.Itoa(curCount + increment), - }, - }) - _, err = configMapInterface.Patch(context.Background(), name, types.JSONPatchType, p, v1.PatchOptions{}) - return err + p := []byte(fmt.Sprintf(`{"data":{"%s":"%s"}}`, config.KeyRefCount, strconv.Itoa(newVal))) + _, err = configMapInterface.Patch(context.Background(), name, types.MergePatchType, p, v1.PatchOptions{}) + if err != nil { + if k8serrors.IsNotFound(err) { + return err + } + err = fmt.Errorf("update ref count error, error: %v", err) + return + } + return }) if err != nil { - log.Errorf("update ref count error, error: %v", err) - } else { - log.Info("update ref count successfully") + return } + log.Info("update ref count successfully") + var cm *corev1.ConfigMap + cm, err = configMapInterface.Get(context.Background(), name, v1.GetOptions{}) + if err != nil { + err = fmt.Errorf("failed to get cm: %s, err: %v", name, err) + return + } + current, err = strconv.Atoi(cm.Data[config.KeyRefCount]) + if err != nil { + err = fmt.Errorf("failed to get ref-count, err: %v", err) + } + return } -func cleanupIfRefCountIsZero(clientset *kubernetes.Clientset, namespace, name string) { - updateRefCount(clientset.CoreV1().ConfigMaps(namespace), name, -1) - cm, err := clientset.CoreV1().ConfigMaps(namespace).Get(context.Background(), name, v1.GetOptions{}) - if err != nil { - log.Error(err) - return - } - refCount, err := strconv.Atoi(cm.GetAnnotations()[config.AnnoRefCount]) - if err != nil { - log.Error(err) - return - } - // if ref-count is less than zero or equals to zero, means nobody is using this traffic pod, so clean it - if refCount <= 0 { - log.Info("refCount is zero, prepare to clean up resource") - // keep configmap - p := []byte( - fmt.Sprintf(`[{"op": "remove", "path": "/data/%s"}]`, config.KeyDHCP), - ) - _, err = clientset.CoreV1().ConfigMaps(namespace).Patch(context.Background(), name, types.JSONPatchType, p, v1.PatchOptions{}) - p = []byte( - fmt.Sprintf(`[{"op": "replace", "path": "/metadata/annotations/%s", "value": "0"}]`, config.AnnoRefCount), - ) - _, err = clientset.CoreV1().ConfigMaps(namespace).Patch(context.Background(), name, types.JSONPatchType, p, v1.PatchOptions{}) - options := v1.DeleteOptions{GracePeriodSeconds: pointer.Int64(0)} - _ = clientset.AdmissionregistrationV1().MutatingWebhookConfigurations().Delete(context.Background(), name+"."+namespace, options) - _ = clientset.RbacV1().RoleBindings(namespace).Delete(context.Background(), name, options) - _ = clientset.CoreV1().ServiceAccounts(namespace).Delete(context.Background(), name, options) - _ = clientset.RbacV1().Roles(namespace).Delete(context.Background(), name, options) - _ = clientset.CoreV1().Services(namespace).Delete(context.Background(), name, options) - _ = clientset.AppsV1().Deployments(namespace).Delete(context.Background(), name, options) - } +func cleanup(clientset *kubernetes.Clientset, namespace, name string) { + log.Info("ref-count is zero, prepare to clean up resource") + // keep configmap + p := []byte(fmt.Sprintf(`[{"op": "remove", "path": "/data/%s"}]`, config.KeyDHCP)) + _, _ = clientset.CoreV1().ConfigMaps(namespace).Patch(context.Background(), name, types.JSONPatchType, p, v1.PatchOptions{}) + p = []byte(fmt.Sprintf(`{"data":{"%s":"%s"}}`, config.KeyRefCount, strconv.Itoa(0))) + _, _ = clientset.CoreV1().ConfigMaps(namespace).Patch(context.Background(), name, types.MergePatchType, p, v1.PatchOptions{}) + options := v1.DeleteOptions{GracePeriodSeconds: pointer.Int64(0)} + _ = clientset.AdmissionregistrationV1().MutatingWebhookConfigurations().Delete(context.Background(), name+"."+namespace, options) + _ = clientset.RbacV1().RoleBindings(namespace).Delete(context.Background(), name, options) + _ = clientset.CoreV1().ServiceAccounts(namespace).Delete(context.Background(), name, options) + _ = clientset.RbacV1().Roles(namespace).Delete(context.Background(), name, options) + _ = clientset.CoreV1().Services(namespace).Delete(context.Background(), name, options) + _ = clientset.AppsV1().Deployments(namespace).Delete(context.Background(), name, options) } diff --git a/pkg/handler/dhcp.go b/pkg/handler/dhcp.go index 7deaa126..6155501f 100644 --- a/pkg/handler/dhcp.go +++ b/pkg/handler/dhcp.go @@ -35,6 +35,7 @@ func NewDHCPManager(client corev1.ConfigMapInterface, namespace string, cidr *ne func (d *DHCPManager) InitDHCP() error { cm, err := d.client.Get(context.Background(), config.ConfigMapPodTrafficManager, metav1.GetOptions{}) if err == nil { + // add key envoy in case of mount not exist content if _, found := cm.Data[config.KeyEnvoy]; !found { _, err = d.client.Patch( context.Background(), @@ -49,17 +50,18 @@ func (d *DHCPManager) InitDHCP() error { } cm = &v1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ - Name: config.ConfigMapPodTrafficManager, - Namespace: d.namespace, - Labels: map[string]string{}, - Annotations: map[string]string{config.AnnoRefCount: "0"}, + Name: config.ConfigMapPodTrafficManager, + Namespace: d.namespace, + Labels: map[string]string{}, + }, + Data: map[string]string{ + config.KeyEnvoy: "", + config.KeyRefCount: "0", }, - Data: map[string]string{config.KeyEnvoy: ""}, } _, err = d.client.Create(context.Background(), cm, metav1.CreateOptions{}) if err != nil { - log.Errorf("create dhcp error, err: %v", err) - return err + return fmt.Errorf("create dhcp error, err: %v", err) } return nil } diff --git a/pkg/handler/remote.go b/pkg/handler/remote.go index bcd2964c..7f70c2d9 100644 --- a/pkg/handler/remote.go +++ b/pkg/handler/remote.go @@ -43,8 +43,11 @@ func CreateOutboundPod(factory cmdutil.Factory, clientset *kubernetes.Clientset, if err == nil { _, err = polymorphichelpers.AttachablePodForObjectFn(factory, service, 2*time.Second) if err == nil { + _, err = updateRefCount(clientset.CoreV1().ConfigMaps(namespace), config.ConfigMapPodTrafficManager, 1) + if err != nil { + return + } log.Infoln("traffic manager already exist, reuse it") - updateRefCount(clientset.CoreV1().ConfigMaps(namespace), config.ConfigMapPodTrafficManager, 1) return net.ParseIP(service.Spec.ClusterIP), nil } } @@ -269,8 +272,8 @@ kubevpn serve -L "tcp://:10800" -L "tun://:8422?net=${TrafficManagerIP}" --debug { Name: config.ContainerSidecarControlPlane, Image: config.Image, - Command: []string{"envoy-xds-server"}, - Args: []string{"--watchDirectoryFileName", "/etc/envoy/envoy-config.yaml"}, + Command: []string{"kubevpn"}, + Args: []string{"control-plane", "--watchDirectoryFilename", "/etc/envoy/envoy-config.yaml"}, Ports: []v1.ContainerPort{{ Name: tcp9002, ContainerPort: 9002, @@ -362,7 +365,7 @@ out: Namespace: namespace, }, Webhooks: []admissionv1.MutatingWebhook{{ - Name: config.ConfigMapPodTrafficManager + ".naison.xxx", // 没意义的 + Name: config.ConfigMapPodTrafficManager + ".naison.io", // 没意义的 ClientConfig: admissionv1.WebhookClientConfig{ Service: &admissionv1.ServiceReference{ Namespace: namespace, @@ -391,7 +394,11 @@ out: }, }, metav1.CreateOptions{}) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to create MutatingWebhookConfigurations, err: %v", err) + } + _, err = updateRefCount(clientset.CoreV1().ConfigMaps(namespace), config.ConfigMapPodTrafficManager, 1) + if err != nil { + return } return net.ParseIP(svc.Spec.ClusterIP), nil } diff --git a/pkg/handler/reset.go b/pkg/handler/reset.go index 0f38a7f9..b66c93ab 100644 --- a/pkg/handler/reset.go +++ b/pkg/handler/reset.go @@ -2,7 +2,6 @@ package handler import ( "context" - "strconv" "strings" log "github.com/sirupsen/logrus" @@ -22,7 +21,7 @@ func (c *ConnectOptions) Reset(ctx2 context.Context) error { return err } var v = make([]*controlplane.Virtual, 0) - if str, ok := cm.Data[config.KeyEnvoy]; ok { + if str, ok := cm.Data[config.KeyEnvoy]; ok && len(str) != 0 { if err = yaml.Unmarshal([]byte(str), &v); err != nil { log.Error(err) return err @@ -40,11 +39,6 @@ func (c *ConnectOptions) Reset(ctx2 context.Context) error { } } } - curCount := 0 - if ref := cm.GetAnnotations()[config.AnnoRefCount]; len(ref) > 0 { - curCount, err = strconv.Atoi(ref) - } - updateRefCount(c.clientset.CoreV1().ConfigMaps(c.Namespace), config.ConfigMapPodTrafficManager, 0-curCount) - cleanupIfRefCountIsZero(c.clientset, c.Namespace, config.ConfigMapPodTrafficManager) + cleanup(c.clientset, c.Namespace, config.ConfigMapPodTrafficManager) return nil } diff --git a/pkg/util/log.go b/pkg/util/log.go index 50dd2c4d..ac6d52b9 100644 --- a/pkg/util/log.go +++ b/pkg/util/log.go @@ -2,6 +2,7 @@ package util import ( "fmt" + log "github.com/sirupsen/logrus" ) @@ -18,6 +19,7 @@ type Format struct { } // 2009/01/23 01:23:23 d.go:23: message +// // same like log.SetFlags(log.LstdFlags | log.Lshortfile) func (*Format) Format(e *log.Entry) ([]byte, error) { return []byte(