refactor: split connect and proxy mode (#495)

This commit is contained in:
naison
2025-03-30 11:46:37 +08:00
committed by GitHub
parent fb428403a2
commit 08bcbe1611
10 changed files with 1235 additions and 1224 deletions

View File

@@ -25,6 +25,8 @@ import (
)
func CmdProxy(f cmdutil.Factory) *cobra.Command {
var headers = make(map[string]string)
var portmap []string
var connect = handler.ConnectOptions{}
var extraRoute = &handler.ExtraRouteInfo{}
var sshConf = &pkgssh.SshConfig{}
@@ -122,11 +124,11 @@ func CmdProxy(f cmdutil.Factory) *cobra.Command {
cli := daemon.GetClient(false)
client, err := cli.Proxy(
cmd.Context(),
&rpc.ConnectRequest{
&rpc.ProxyRequest{
KubeconfigBytes: string(bytes),
Namespace: ns,
Headers: connect.Headers,
PortMap: connect.PortMap,
Headers: headers,
PortMap: portmap,
Workloads: args,
ExtraRoute: extraRoute.ToRPC(),
Engine: string(connect.Engine),
@@ -163,8 +165,8 @@ func CmdProxy(f cmdutil.Factory) *cobra.Command {
return nil
},
}
cmd.Flags().StringToStringVarP(&connect.Headers, "headers", "H", map[string]string{}, "Traffic with special headers (use `and` to match all headers) with reverse it to local PC, If not special, redirect all traffic to local PC. format: <KEY>=<VALUE> eg: --headers foo=bar --headers env=dev")
cmd.Flags().StringArrayVar(&connect.PortMap, "portmap", []string{}, "Port map, map container port to local port, format: [tcp/udp]/containerPort:localPort, If not special, localPort will use containerPort. eg: tcp/80:8080 or udp/5000:5001 or 80 or 80:8080")
cmd.Flags().StringToStringVarP(&headers, "headers", "H", map[string]string{}, "Traffic with special headers (use `and` to match all headers) with reverse it to local PC, If not special, redirect all traffic to local PC. format: <KEY>=<VALUE> eg: --headers foo=bar --headers env=dev")
cmd.Flags().StringArrayVar(&portmap, "portmap", []string{}, "Port map, map container port to local port, format: [tcp/udp]/containerPort:localPort, If not special, localPort will use containerPort. eg: tcp/80:8080 or udp/5000:5001 or 80 or 80:8080")
handler.AddCommonFlags(cmd.Flags(), &transferImage, &imagePullSecretName, &connect.Engine)
cmd.Flags().BoolVar(&foreground, "foreground", false, "foreground hang up")

View File

@@ -25,8 +25,6 @@ func (svr *Server) ConnectFork(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectF
ctx := resp.Context()
connect := &handler.ConnectOptions{
Namespace: req.Namespace,
Headers: req.Headers,
Workloads: req.Workloads,
ExtraRouteInfo: *handler.ParseExtraRouteFromRPC(req.ExtraRoute),
Engine: config.Engine(req.Engine),
OriginKubeconfigPath: req.OriginKubeconfigPath,
@@ -92,8 +90,6 @@ func (svr *Server) redirectConnectForkToSudoDaemon(req *rpc.ConnectRequest, resp
}
connect := &handler.ConnectOptions{
Namespace: req.Namespace,
Headers: req.Headers,
Workloads: req.Workloads,
ExtraRouteInfo: *handler.ParseExtraRouteFromRPC(req.ExtraRoute),
Engine: config.Engine(req.Engine),
OriginKubeconfigPath: req.OriginKubeconfigPath,

View File

@@ -44,9 +44,6 @@ func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServe
svr.t = time.Now()
svr.connect = &handler.ConnectOptions{
Namespace: req.Namespace,
Headers: req.Headers,
PortMap: req.PortMap,
Workloads: req.Workloads,
ExtraRouteInfo: *handler.ParseExtraRouteFromRPC(req.ExtraRoute),
Engine: config.Engine(req.Engine),
OriginKubeconfigPath: req.OriginKubeconfigPath,
@@ -108,9 +105,6 @@ func (svr *Server) redirectToSudoDaemon(req *rpc.ConnectRequest, resp rpc.Daemon
}
connect := &handler.ConnectOptions{
Namespace: req.Namespace,
Headers: req.Headers,
PortMap: req.PortMap,
Workloads: req.Workloads,
ExtraRouteInfo: *handler.ParseExtraRouteFromRPC(req.ExtraRoute),
Engine: config.Engine(req.Engine),
OriginKubeconfigPath: req.OriginKubeconfigPath,

View File

@@ -24,15 +24,12 @@ import (
// 2. if already connect to cluster
// 2.1 disconnect from cluster
// 2.2 same as step 1
func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) (e error) {
func (svr *Server) Proxy(req *rpc.ProxyRequest, resp rpc.Daemon_ProxyServer) (e error) {
logger := plog.GetLoggerForClient(int32(log.InfoLevel), io.MultiWriter(newProxyWarp(resp), svr.LogFile))
config.Image = req.Image
ctx := plog.WithLogger(resp.Context(), logger)
connect := &handler.ConnectOptions{
Namespace: req.Namespace,
Headers: req.Headers,
PortMap: req.PortMap,
Workloads: req.Workloads,
ExtraRouteInfo: *handler.ParseExtraRouteFromRPC(req.ExtraRoute),
Engine: config.Engine(req.Engine),
OriginKubeconfigPath: req.OriginKubeconfigPath,
@@ -59,7 +56,7 @@ func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) (
return err
}
var workloads []string
workloads, err = util.NormalizedResource(ctx, connect.GetFactory(), connect.GetClientset(), connect.Namespace, connect.Workloads)
workloads, err = util.NormalizedResource(ctx, connect.GetFactory(), connect.GetClientset(), connect.Namespace, req.Workloads)
if err != nil {
return err
}
@@ -110,7 +107,7 @@ func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) (
if svr.connect == nil {
plog.G(ctx).Debugf("Connectting to cluster")
var connResp rpc.Daemon_ConnectClient
connResp, err = daemonClient.Connect(ctx, req)
connResp, err = daemonClient.Connect(ctx, convert(req))
if err != nil {
return err
}
@@ -133,7 +130,7 @@ type proxyWarp struct {
}
func (r *proxyWarp) Write(p []byte) (n int, err error) {
_ = r.server.Send(&rpc.ConnectResponse{
_ = r.server.Send(&rpc.ProxyResponse{
Message: string(p),
})
return len(p), nil
@@ -142,3 +139,19 @@ func (r *proxyWarp) Write(p []byte) (n int, err error) {
func newProxyWarp(server rpc.Daemon_ProxyServer) io.Writer {
return &proxyWarp{server: server}
}
func convert(req *rpc.ProxyRequest) *rpc.ConnectRequest {
return &rpc.ConnectRequest{
KubeconfigBytes: req.KubeconfigBytes,
Namespace: req.Namespace,
Engine: req.Engine,
ExtraRoute: req.ExtraRoute,
SshJump: req.SshJump,
TransferImage: req.TransferImage,
Image: req.Image,
ImagePullSecretName: req.ImagePullSecretName,
Foreground: req.Foreground,
Level: req.Level,
OriginKubeconfigPath: req.OriginKubeconfigPath,
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -8,7 +8,7 @@ service Daemon {
rpc Connect (ConnectRequest) returns (stream ConnectResponse) {}
rpc ConnectFork (ConnectRequest) returns (stream ConnectResponse) {}
rpc Disconnect (DisconnectRequest) returns (stream DisconnectResponse) {}
rpc Proxy (ConnectRequest) returns (stream ConnectResponse) {}
rpc Proxy (ProxyRequest) returns (stream ProxyResponse) {}
rpc Leave (LeaveRequest) returns (stream LeaveResponse) {}
rpc Clone (CloneRequest) returns (stream CloneResponse) {}
rpc Remove (RemoveRequest) returns (stream RemoveResponse) {}
@@ -33,6 +33,53 @@ service Daemon {
}
message ConnectRequest {
string KubeconfigBytes = 1;
string Namespace = 2;
string Engine = 3;
// extra route table info
ExtraRoute ExtraRoute = 4;
// ssh jump
SshJump SshJump = 5;
// transfer image
bool TransferImage = 6;
string Image = 7;
string ImagePullSecretName = 8;
// foreground
bool Foreground = 9;
// log level
int32 Level = 10;
string OriginKubeconfigPath = 11;
}
message ConnectResponse {
string message = 1;
}
message DisconnectRequest {
// 1) disconnect by id
optional int32 ID = 1;
// 2) disconnect all
optional bool All = 2;
// 3) disconnect by kubeConfig
optional string KubeconfigBytes = 3;
optional string Namespace = 4;
SshJump SshJump = 5;
// 4) disconnect by cluster ids
repeated string ClusterIDs = 6;
}
message DisconnectResponse {
string message = 1;
}
message ProxyRequest {
string KubeconfigBytes = 1;
string Namespace = 2;
map<string, string> Headers = 3;
@@ -60,24 +107,7 @@ message ConnectRequest {
string OriginKubeconfigPath = 15;
}
message ConnectResponse {
string message = 1;
}
message DisconnectRequest {
// 1) disconnect by id
optional int32 ID = 1;
// 2) disconnect all
optional bool All = 2;
// 3) disconnect by kubeConfig
optional string KubeconfigBytes = 3;
optional string Namespace = 4;
SshJump SshJump = 5;
// 4) disconnect by cluster ids
repeated string ClusterIDs = 6;
}
message DisconnectResponse {
message ProxyResponse {
string message = 1;
}

File diff suppressed because it is too large Load Diff

View File

@@ -106,7 +106,7 @@ func (option *Options) Connect(ctx context.Context, sshConfig *pkgssh.SshConfig,
}
// not needs to ssh jump in daemon, because dev mode will hang up until user exit,
// so just ssh jump in client is enough
req := &rpc.ConnectRequest{
req := &rpc.ProxyRequest{
KubeconfigBytes: string(kubeConfigBytes),
Namespace: ns,
Headers: option.Headers,
@@ -131,7 +131,7 @@ func (option *Options) Connect(ctx context.Context, sshConfig *pkgssh.SshConfig,
_ = util.PrintGRPCStream[rpc.DisconnectResponse](resp)
return nil
})
var resp rpc.Daemon_ConnectClient
var resp rpc.Daemon_ProxyClient
resp, err = daemonCli.Proxy(ctx, req)
if err != nil {
plog.G(ctx).Errorf("Connect to cluster error: %s", err.Error())

View File

@@ -1,6 +1,6 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
// - protoc-gen-go-grpc v1.5.1
// - protoc v3.21.2
// source: dhcpserver.proto
@@ -15,8 +15,8 @@ import (
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// Requires gRPC-Go v1.64.0 or later.
const _ = grpc.SupportPackageIsVersion9
const (
DHCP_RentIP_FullMethodName = "/rpc.DHCP/RentIP"
@@ -40,8 +40,9 @@ func NewDHCPClient(cc grpc.ClientConnInterface) DHCPClient {
}
func (c *dHCPClient) RentIP(ctx context.Context, in *RentIPRequest, opts ...grpc.CallOption) (*RentIPResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(RentIPResponse)
err := c.cc.Invoke(ctx, DHCP_RentIP_FullMethodName, in, out, opts...)
err := c.cc.Invoke(ctx, DHCP_RentIP_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
@@ -49,8 +50,9 @@ func (c *dHCPClient) RentIP(ctx context.Context, in *RentIPRequest, opts ...grpc
}
func (c *dHCPClient) ReleaseIP(ctx context.Context, in *ReleaseIPRequest, opts ...grpc.CallOption) (*ReleaseIPResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(ReleaseIPResponse)
err := c.cc.Invoke(ctx, DHCP_ReleaseIP_FullMethodName, in, out, opts...)
err := c.cc.Invoke(ctx, DHCP_ReleaseIP_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
@@ -59,16 +61,19 @@ func (c *dHCPClient) ReleaseIP(ctx context.Context, in *ReleaseIPRequest, opts .
// DHCPServer is the server API for DHCP service.
// All implementations must embed UnimplementedDHCPServer
// for forward compatibility
// for forward compatibility.
type DHCPServer interface {
RentIP(context.Context, *RentIPRequest) (*RentIPResponse, error)
ReleaseIP(context.Context, *ReleaseIPRequest) (*ReleaseIPResponse, error)
mustEmbedUnimplementedDHCPServer()
}
// UnimplementedDHCPServer must be embedded to have forward compatible implementations.
type UnimplementedDHCPServer struct {
}
// UnimplementedDHCPServer must be embedded to have
// forward compatible implementations.
//
// NOTE: this should be embedded by value instead of pointer to avoid a nil
// pointer dereference when methods are called.
type UnimplementedDHCPServer struct{}
func (UnimplementedDHCPServer) RentIP(context.Context, *RentIPRequest) (*RentIPResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method RentIP not implemented")
@@ -77,6 +82,7 @@ func (UnimplementedDHCPServer) ReleaseIP(context.Context, *ReleaseIPRequest) (*R
return nil, status.Errorf(codes.Unimplemented, "method ReleaseIP not implemented")
}
func (UnimplementedDHCPServer) mustEmbedUnimplementedDHCPServer() {}
func (UnimplementedDHCPServer) testEmbeddedByValue() {}
// UnsafeDHCPServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to DHCPServer will
@@ -86,6 +92,13 @@ type UnsafeDHCPServer interface {
}
func RegisterDHCPServer(s grpc.ServiceRegistrar, srv DHCPServer) {
// If the following call pancis, it indicates UnimplementedDHCPServer was
// embedded by pointer and is nil. This will cause panics if an
// unimplemented method is ever invoked, so we test this at initialization
// time to prevent it from happening at runtime later due to I/O.
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
t.testEmbeddedByValue()
}
s.RegisterService(&DHCP_ServiceDesc, srv)
}

View File

@@ -59,9 +59,6 @@ import (
type ConnectOptions struct {
Namespace string
Headers map[string]string
PortMap []string
Workloads []string
ExtraRouteInfo ExtraRouteInfo
Engine config.Engine
Foreground bool