From c0eb0c07f2c01f96f371f1476944d51ae6714424 Mon Sep 17 00:00:00 2001 From: VaalaCat Date: Sun, 10 Aug 2025 15:45:41 +0000 Subject: [PATCH] fix: start timeout --- cmd/frpp/shared/cmd.go | 4 +++ defs/const.go | 1 + services/clientrpc/rpc_service.go | 9 ++++++ services/rpc/master.go | 11 +++++++- utils/logger/init.go | 14 ++++++++-- utils/wsgrpc/wsgrpc.go | 46 +++++++++++++++++++++++-------- 6 files changed, 70 insertions(+), 15 deletions(-) diff --git a/cmd/frpp/shared/cmd.go b/cmd/frpp/shared/cmd.go index 14b31fc..23eb60e 100644 --- a/cmd/frpp/shared/cmd.go +++ b/cmd/frpp/shared/cmd.go @@ -41,6 +41,7 @@ func BuildCommand(fs embed.FS) *cobra.Command { logger.UpdateLoggerOpt( cfg.Logger.FRPLoggerLevel, cfg.Logger.DefaultLoggerLevel, + cfg.IsDebug, ) return NewRootCmd( @@ -152,6 +153,7 @@ func NewMasterCmd(cfg conf.Config, fs embed.FS) *cobra.Command { warnDepParam(cmd) opts := []fx.Option{ + fx.StartTimeout(defs.AppStartTimeout), commonMod, masterMod, serverMod, @@ -198,6 +200,7 @@ func NewClientCmd(cfg conf.Config) *cobra.Command { warnDepParam(cmd) opts := []fx.Option{ + fx.StartTimeout(defs.AppStartTimeout), clientMod, commonMod, fx.Supply( @@ -243,6 +246,7 @@ func NewServerCmd(cfg conf.Config) *cobra.Command { warnDepParam(cmd) opts := []fx.Option{ + fx.StartTimeout(defs.AppStartTimeout), serverMod, commonMod, fx.Supply( diff --git a/defs/const.go b/defs/const.go index 7aa650d..4128f5a 100644 --- a/defs/const.go +++ b/defs/const.go @@ -71,6 +71,7 @@ const ( PullConfigDuration = 30 * time.Second PushProxyInfoDuration = 30 * time.Second PullClientWorkersDuration = 30 * time.Second + AppStartTimeout = 5 * time.Minute ) const ( diff --git a/services/clientrpc/rpc_service.go b/services/clientrpc/rpc_service.go index 77038ff..60585f8 100644 --- a/services/clientrpc/rpc_service.go +++ b/services/clientrpc/rpc_service.go @@ -1,8 +1,11 @@ package clientrpc import ( + "context" + "github.com/VaalaCat/frp-panel/pb" "github.com/VaalaCat/frp-panel/services/app" + "github.com/VaalaCat/frp-panel/utils/logger" ) type ClientRPCHandler interface { @@ -42,6 +45,12 @@ func NewClientRPCHandler( } func (s *clientRPCHandler) Run() { + defer func() { + if err := recover(); err != nil { + logger.Logger(context.Background()).Fatalf("client rpc handler panic: %v", err) + } + }() + startClientRpcHandler(s.appInstance, s.rpcClient, s.done, s.clientID, s.clientSecret, s.event, s.handerFunc) } diff --git a/services/rpc/master.go b/services/rpc/master.go index 3a61b74..4f469a6 100644 --- a/services/rpc/master.go +++ b/services/rpc/master.go @@ -11,6 +11,7 @@ import ( "github.com/VaalaCat/frp-panel/defs" "github.com/VaalaCat/frp-panel/pb" "github.com/VaalaCat/frp-panel/services/app" + "github.com/VaalaCat/frp-panel/utils" "github.com/VaalaCat/frp-panel/utils/logger" "github.com/VaalaCat/frp-panel/utils/wsgrpc" "github.com/imroc/req/v3" @@ -34,6 +35,7 @@ func (m *masterClient) Call() pb.MasterClient { } func NewMasterCli(appInstance app.Application) *masterClient { + logger.Logger(context.Background()).Debugf("creating new master client") return &masterClient{ inited: false, appInstance: appInstance, @@ -60,16 +62,23 @@ func newMasterCli(appInstance app.Application) pb.MasterClient { wsURL := fmt.Sprintf("%s://%s/wsgrpc", connInfo.Scheme, connInfo.Host) header := http.Header{} - wsDialer := wsgrpc.WebsocketDialer(wsURL, header, appInstance.GetConfig().Client.TLSInsecureSkipVerify) + wsDialer := wsgrpc.WebsocketDialer(wsURL, + header, + appInstance.GetConfig().Client.TLSInsecureSkipVerify, + logger.Logger(ctx), + ) opt = append(opt, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithContextDialer(wsDialer)) } + logger.Logger(ctx).Debugf("creating new grpc client to [%s]", utils.MarshalForJson(connInfo)) conn, err := grpc.NewClient(connInfo.Host, opt...) if err != nil { logger.Logger(ctx).Fatalf("did not connect: %v", err) } + logger.Logger(ctx).Debugf("grpc client created") + return pb.NewMasterClient(conn) } diff --git a/utils/logger/init.go b/utils/logger/init.go index 5b97549..b1e6f32 100644 --- a/utils/logger/init.go +++ b/utils/logger/init.go @@ -34,17 +34,25 @@ func InitLogger() { logrus.SetFormatter(NewCustomFormatter(false, true)) } -func UpdateLoggerOpt(frpLogLevel string, logrusLevel string) { +func UpdateLoggerOpt(frpLogLevel string, logrusLevel string, isDebug bool) { ctx := context.Background() frpLogLevel = strings.ToLower(frpLogLevel) logrusLevel = strings.ToLower(logrusLevel) if frpLogLevel == "" { - frpLogLevel = "info" + if isDebug { + frpLogLevel = "trace" + } else { + frpLogLevel = "info" + } } if logrusLevel == "" { - logrusLevel = "info" + if isDebug { + logrusLevel = "trace" + } else { + logrusLevel = "info" + } } frpLv, err := log.ParseLevel(frpLogLevel) diff --git a/utils/wsgrpc/wsgrpc.go b/utils/wsgrpc/wsgrpc.go index 1a6b95e..b135375 100644 --- a/utils/wsgrpc/wsgrpc.go +++ b/utils/wsgrpc/wsgrpc.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/tls" + "errors" "fmt" "net" "net/http" @@ -14,6 +15,10 @@ import ( "github.com/gorilla/websocket" ) +var ( + WSGrpcError = errors.New("wsgrpc error") +) + // --------------------------------------- // 通用 websocketConn 实现 net.Conn 接口 // --------------------------------------- @@ -35,16 +40,20 @@ func (c *websocketConn) Read(p []byte) (int, error) { if c.readBuffer.Len() == 0 { messageType, data, err := c.ws.ReadMessage() if err != nil { - return 0, err + return 0, errors.Join(err, errors.New("wsgrpc read message error"), WSGrpcError) } // 只接受二进制数据 if messageType != websocket.BinaryMessage { - return 0, fmt.Errorf("unexpected message type: %d", messageType) + return 0, errors.Join(fmt.Errorf("unexpected message type: %d", messageType), WSGrpcError) } c.readBuffer.Write(data) } - return c.readBuffer.Read(p) + if n, err := c.readBuffer.Read(p); err != nil { + return n, errors.Join(err, WSGrpcError) + } else { + return n, nil + } } // Write 将数据作为单条二进制消息发送 @@ -54,14 +63,18 @@ func (c *websocketConn) Write(p []byte) (int, error) { err := c.ws.WriteMessage(websocket.BinaryMessage, p) if err != nil { - return 0, err + return 0, errors.Join(err, errors.New("wsgrpc write message error"), WSGrpcError) } return len(p), nil } // Close 关闭 websocket 连接 func (c *websocketConn) Close() error { - return c.ws.Close() + err := c.ws.Close() + if err != nil { + return errors.Join(err, errors.New("wsgrpc close error"), WSGrpcError) + } + return nil } // LocalAddr 返回本地地址,通过 websocket 底层连接获取 @@ -83,9 +96,12 @@ func (c *websocketConn) RemoteAddr() net.Addr { // SetDeadline 同时设置读写超时 func (c *websocketConn) SetDeadline(t time.Time) error { if err := c.ws.SetReadDeadline(t); err != nil { - return err + return errors.Join(err, errors.New("wsgrpc set read deadline error"), WSGrpcError) } - return c.ws.SetWriteDeadline(t) + if err := c.ws.SetWriteDeadline(t); err != nil { + return errors.Join(err, errors.New("wsgrpc set write deadline error"), WSGrpcError) + } + return nil } // SetReadDeadline 设置读超时 @@ -101,18 +117,26 @@ func (c *websocketConn) SetWriteDeadline(t time.Time) error { // --------------------------------------- // 客户端 WebSocket Dialer // --------------------------------------- +type LogInterface interface { + Infof(format string, args ...interface{}) + Errorf(format string, args ...interface{}) + Tracef(format string, args ...interface{}) +} // WebsocketDialer 返回一个可以用于 grpc.WithContextDialer 的拨号函数;该函数通过 websocket 建立连接。 // 参数 url 表示 websocket 服务器地址;header 可用于传递额外的 header 参数。 -func WebsocketDialer(url string, header http.Header, insecure bool) func(ctx context.Context, addr string) (net.Conn, error) { +func WebsocketDialer(url string, header http.Header, insecure bool, log LogInterface) func(ctx context.Context, addr string) (net.Conn, error) { return func(ctx context.Context, addr string) (net.Conn, error) { dialer := websocket.Dialer{ TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}, } + log.Tracef("dialing websocket server [%s]", url) ws, _, err := dialer.DialContext(ctx, url, header) if err != nil { - return nil, err + log.Errorf("wsgrpc dialer error: %v", err) + return nil, errors.Join(err, errors.New("wsgrpc dialer error"), WSGrpcError) } + log.Tracef("websocket connection connect done") return &websocketConn{ws: ws}, nil } } @@ -160,11 +184,11 @@ func (l *WSListener) Accept() (net.Conn, error) { select { case conn, ok := <-l.connCh: if !ok { - return nil, fmt.Errorf("listener closed") + return nil, errors.Join(fmt.Errorf("listener closed"), WSGrpcError) } return conn, nil case <-l.done: - return nil, fmt.Errorf("listener closed") + return nil, errors.Join(fmt.Errorf("listener closed"), WSGrpcError) } }