feat: optimize code

This commit is contained in:
fengcaiwen
2023-09-05 19:28:57 +08:00
committed by naison
parent a7ca7853f5
commit 29f5c191a5
2 changed files with 47 additions and 31 deletions

View File

@@ -31,6 +31,12 @@ func (h *tunHandler) HandleClient(ctx context.Context, tun net.Conn) {
} }
d.SetTunInboundHandler(func(tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem) { d.SetTunInboundHandler(func(tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem) {
for { for {
select {
case <-ctx.Done():
return
default:
}
packetConn, err := getRemotePacketConn(ctx, h.chain) packetConn, err := getRemotePacketConn(ctx, h.chain)
if err != nil { if err != nil {
log.Debugf("[tun-client] %s - %s: %s", tun.LocalAddr(), remoteAddr, err) log.Debugf("[tun-client] %s - %s: %s", tun.LocalAddr(), remoteAddr, err)

View File

@@ -386,10 +386,15 @@ func DoDev(ctx context.Context, devOption *Options, flags *pflag.FlagSet, f cmdu
} }
// connect to cluster, in container or host // connect to cluster, in container or host
err = devOption.doConnect(ctx, f, transferImage) finalFunc, err := devOption.doConnect(ctx, f, transferImage)
if err != nil { if err != nil {
return err return err
} }
defer func() {
if finalFunc != nil {
finalFunc()
}
}()
var tempContainerConfig *containerConfig var tempContainerConfig *containerConfig
err = validatePullOpt(devOption.Options.Pull) err = validatePullOpt(devOption.Options.Pull)
@@ -418,7 +423,7 @@ func DoDev(ctx context.Context, devOption *Options, flags *pflag.FlagSet, f cmdu
return devOption.Main(ctx, tempContainerConfig) return devOption.Main(ctx, tempContainerConfig)
} }
func (d *Options) doConnect(ctx context.Context, f cmdutil.Factory, transferImage bool) error { func (d *Options) doConnect(ctx context.Context, f cmdutil.Factory, transferImage bool) (ff func(), err error) {
connect := &handler.ConnectOptions{ connect := &handler.ConnectOptions{
Headers: d.Headers, Headers: d.Headers,
Workloads: []string{d.Workload}, Workloads: []string{d.Workload},
@@ -426,35 +431,35 @@ func (d *Options) doConnect(ctx context.Context, f cmdutil.Factory, transferImag
ExtraDomain: d.ExtraDomain, ExtraDomain: d.ExtraDomain,
Engine: d.Engine, Engine: d.Engine,
} }
if err := connect.InitClient(f); err != nil { if err = connect.InitClient(f); err != nil {
return err return
} }
d.Namespace = connect.Namespace d.Namespace = connect.Namespace
if err := connect.PreCheckResource(); err != nil { if err = connect.PreCheckResource(); err != nil {
return err return
} }
if len(connect.Workloads) > 1 { if len(connect.Workloads) > 1 {
return fmt.Errorf("can only dev one workloads at same time, workloads: %v", connect.Workloads) return nil, fmt.Errorf("can only dev one workloads at same time, workloads: %v", connect.Workloads)
} }
if len(connect.Workloads) < 1 { if len(connect.Workloads) < 1 {
return fmt.Errorf("you must provide resource to dev, workloads : %v is invaild", connect.Workloads) return nil, fmt.Errorf("you must provide resource to dev, workloads : %v is invaild", connect.Workloads)
} }
d.Workload = connect.Workloads[0] d.Workload = connect.Workloads[0]
// if no-proxy is true, not needs to intercept traffic // if no-proxy is true, not needs to intercept traffic
if d.NoProxy { if d.NoProxy {
if len(connect.Headers) != 0 { if len(connect.Headers) != 0 {
return fmt.Errorf("not needs to provide headers if is no-proxy mode") return nil, fmt.Errorf("not needs to provide headers if is no-proxy mode")
} }
connect.Workloads = []string{} connect.Workloads = []string{}
} }
switch d.ConnectMode { switch d.ConnectMode {
case ConnectModeHost: case ConnectModeHost:
go func() { daemonClient := daemon.GetClient(true)
<-ctx.Done() ff = func() {
connectClient, err := daemon.GetClient(true).Disconnect(ctx, &rpc.DisconnectRequest{}) connectClient, err := daemonClient.Disconnect(ctx, &rpc.DisconnectRequest{})
if err != nil { if err != nil {
log.Error(err) log.Error(err)
return return
@@ -469,10 +474,11 @@ func (d *Options) doConnect(ctx context.Context, f cmdutil.Factory, transferImag
} }
log.Print(recv.Message) log.Print(recv.Message)
} }
}() }
kubeconfig, err := util.ConvertToKubeconfigBytes(f) var kubeconfig []byte
kubeconfig, err = util.ConvertToKubeconfigBytes(f)
if err != nil { if err != nil {
return err return
} }
req := &rpc.ConnectRequest{ req := &rpc.ConnectRequest{
KubeconfigBytes: string(kubeconfig), KubeconfigBytes: string(kubeconfig),
@@ -487,43 +493,47 @@ func (d *Options) doConnect(ctx context.Context, f cmdutil.Factory, transferImag
Image: config.Image, Image: config.Image,
Level: int32(log.DebugLevel), Level: int32(log.DebugLevel),
} }
connectClient, err := daemon.GetClient(true).Connect(ctx, req) var connectClient rpc.Daemon_ConnectClient
connectClient, err = daemonClient.Connect(ctx, req)
if err != nil { if err != nil {
return err return
} }
for { for {
recv, err := connectClient.Recv() recv, err := connectClient.Recv()
if err == io.EOF { if err == io.EOF {
return nil return ff, nil
} else if err != nil { } else if err != nil {
return err return ff, err
} }
log.Print(recv.Message) log.Print(recv.Message)
} }
case ConnectModeContainer: case ConnectModeContainer:
var connectContainer *RunConfig var connectContainer *RunConfig
path, err := connect.GetKubeconfigPath() var path string
path, err = connect.GetKubeconfigPath()
if err != nil { if err != nil {
return err return
} }
var platform *specs.Platform var platform *specs.Platform
if d.Options.Platform != "" { if d.Options.Platform != "" {
p, err := platforms.Parse(d.Options.Platform) var p specs.Platform
p, err = platforms.Parse(d.Options.Platform)
if err != nil { if err != nil {
return pkgerr.Wrap(err, "error parsing specified platform") return nil, pkgerr.Wrap(err, "error parsing specified platform")
} }
platform = &p platform = &p
} }
connectContainer, err = createConnectContainer(d.NoProxy, *connect, path, d.Cli, platform) connectContainer, err = createConnectContainer(d.NoProxy, *connect, path, d.Cli, platform)
if err != nil { if err != nil {
return err return
} }
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
var id string var id string
if id, err = run(ctx, connectContainer, d.Cli, d.DockerCli); err != nil { id, err = run(ctx, connectContainer, d.Cli, d.DockerCli)
return err if err != nil {
return
} }
h := interrupt.New(func(signal os.Signal) { h := interrupt.New(func(signal os.Signal) {
os.Exit(0) os.Exit(0)
@@ -537,17 +547,17 @@ func (d *Options) doConnect(ctx context.Context, f cmdutil.Factory, transferImag
if err = runLogsWaitRunning(ctx, d.DockerCli, id); err != nil { if err = runLogsWaitRunning(ctx, d.DockerCli, id); err != nil {
// interrupt by signal KILL // interrupt by signal KILL
if ctx.Err() == context.Canceled { if ctx.Err() == context.Canceled {
return nil return nil, nil
} }
return err return nil, err
} }
if err = d.Copts.netMode.Set("container:" + id); err != nil { if err = d.Copts.netMode.Set("container:" + id); err != nil {
return err return nil, err
} }
default: default:
return fmt.Errorf("unsupport connect mode: %s", d.ConnectMode) return nil, fmt.Errorf("unsupport connect mode: %s", d.ConnectMode)
} }
return nil return nil, nil
} }
func createConnectContainer(devOptions bool, connect handler.ConnectOptions, path string, cli *client.Client, platform *specs.Platform) (*RunConfig, error) { func createConnectContainer(devOptions bool, connect handler.ConnectOptions, path string, cli *client.Client, platform *specs.Platform) (*RunConfig, error) {