From c205a7da6f11a64a00012e3888039a82b1a19ae0 Mon Sep 17 00:00:00 2001 From: smallnest Date: Wed, 15 Aug 2018 18:17:48 +0800 Subject: [PATCH] #245 shutdown gateway graceful too --- server/gateway.go | 15 ++++++++++++- server/server.go | 55 +++++++++++++++++++++++++++-------------------- 2 files changed, 46 insertions(+), 24 deletions(-) diff --git a/server/gateway.go b/server/gateway.go index 761e691..67563b8 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -38,11 +38,24 @@ func (s *Server) startHTTP1APIGateway(ln net.Listener) { router.GET("/*servicePath", s.handleGatewayRequest) router.PUT("/*servicePath", s.handleGatewayRequest) - if err := http.Serve(ln, router); err != nil { + s.mu.Lock() + s.gatewayHTTPServer = &http.Server{Handler: router} + s.mu.Unlock() + if err := s.gatewayHTTPServer.Serve(ln); err != nil { log.Errorf("error in gateway Serve: %s", err) } } +func (s *Server) closeHTTP1APIGateway(ctx context.Context) error { + s.mu.Lock() + defer s.mu.Unlock() + if s.gatewayHTTPServer != nil { + return s.gatewayHTTPServer.Shutdown(ctx) + } + + return nil +} + func (s *Server) handleGatewayRequest(w http.ResponseWriter, r *http.Request, params httprouter.Params) { if r.Header.Get(XServicePath) == "" { servicePath := params.ByName("servicePath") diff --git a/server/server.go b/server/server.go index 6e512f0..813b242 100644 --- a/server/server.go +++ b/server/server.go @@ -17,12 +17,13 @@ import ( "sync/atomic" "time" - "github.com/smallnest/rpcx/log" - "github.com/smallnest/rpcx/protocol" - "github.com/smallnest/rpcx/share" "os" "os/signal" "syscall" + + "github.com/smallnest/rpcx/log" + "github.com/smallnest/rpcx/protocol" + "github.com/smallnest/rpcx/share" ) // ErrServerClosed is returned by the Server's Serve, ListenAndServe after a call to Shutdown or Close. @@ -56,9 +57,10 @@ var ( // Server is rpcx server that use TCP or UDP. type Server struct { - ln net.Listener - readTimeout time.Duration - writeTimeout time.Duration + ln net.Listener + readTimeout time.Duration + writeTimeout time.Duration + gatewayHTTPServer *http.Server serviceMapMu sync.RWMutex serviceMap map[string]*service @@ -152,7 +154,7 @@ func (s *Server) getDoneChan() <-chan struct{} { return s.doneChan } -func (s *Server)startShutdownListener() { +func (s *Server) startShutdownListener() { go func(s *Server) { log.Info("server pid:", os.Getpid()) c := make(chan os.Signal, 1) @@ -160,7 +162,7 @@ func (s *Server)startShutdownListener() { si := <-c if si.String() == "terminated" { if nil != s.onShutdown && len(s.onShutdown) > 0 { - for _,sd := range s.onShutdown { + for _, sd := range s.onShutdown { sd(s) } } @@ -303,7 +305,7 @@ func (s *Server) serveConn(conn net.Conn) { }() if isShutdown(s) { - closeChannel(s,conn) + closeChannel(s, conn) return } @@ -324,7 +326,7 @@ func (s *Server) serveConn(conn net.Conn) { for { if isShutdown(s) { - closeChannel(s,conn) + closeChannel(s, conn) return } @@ -346,7 +348,6 @@ func (s *Server) serveConn(conn net.Conn) { return } - if s.writeTimeout != 0 { conn.SetWriteDeadline(t0.Add(s.writeTimeout)) } @@ -423,11 +424,11 @@ func (s *Server) serveConn(conn net.Conn) { } } -func isShutdown(s *Server) (bool) { +func isShutdown(s *Server) bool { return atomic.LoadInt32(&s.inShutdown) == 1 } -func closeChannel(s *Server,conn net.Conn) { +func closeChannel(s *Server, conn net.Conn) { s.mu.Lock() delete(s.activeConn, conn) s.mu.Unlock() @@ -640,13 +641,13 @@ func (s *Server) RegisterOnShutdown(f func(s *Server)) { var shutdownPollInterval = 1000 * time.Millisecond -// // Shutdown gracefully shuts down the server without interrupting any -// // active connections. Shutdown works by first closing the -// // listener, then closing all idle connections, and then waiting -// // indefinitely for connections to return to idle and then shut down. -// // If the provided context expires before the shutdown is complete, -// // Shutdown returns the context's error, otherwise it returns any -// // error returned from closing the Server's underlying Listener. +// Shutdown gracefully shuts down the server without interrupting any +// active connections. Shutdown works by first closing the +// listener, then closing all idle connections, and then waiting +// indefinitely for connections to return to idle and then shut down. +// If the provided context expires before the shutdown is complete, +// Shutdown returns the context's error, otherwise it returns any +// error returned from closing the Server's underlying Listener. func (s *Server) Shutdown(ctx context.Context) error { if atomic.CompareAndSwapInt32(&s.inShutdown, 0, 1) { log.Info("shutdown begin") @@ -663,15 +664,23 @@ func (s *Server) Shutdown(ctx context.Context) error { } } s.Close() + + if s.gatewayHTTPServer != nil { + if err := s.closeHTTP1APIGateway(ctx); err != nil { + log.Warnf("failed to close gateway: %v", err) + } else { + log.Info("closed gateway") + } + } log.Info("shutdown end") } return nil } -func (s *Server) checkProcessMsg() (bool) { +func (s *Server) checkProcessMsg() bool { size := s.handlerMsgNum - log.Info("need handle msg size:",size) - if size == 0 { + log.Info("need handle msg size:", size) + if size == 0 { return true } return false