export keepalive option

This commit is contained in:
chaoyuepan
2020-12-10 20:25:24 +08:00
parent 2bf5ca3937
commit 211e8f854f
4 changed files with 35 additions and 14 deletions

View File

@@ -44,12 +44,13 @@ func (e ServiceError) Error() string {
// DefaultOption is a common option configuration for client.
var DefaultOption = Option{
Retries: 3,
RPCPath: share.DefaultRPCPath,
ConnectTimeout: 10 * time.Second,
SerializeType: protocol.MsgPack,
CompressType: protocol.None,
BackupLatency: 10 * time.Millisecond,
Retries: 3,
RPCPath: share.DefaultRPCPath,
ConnectTimeout: 10 * time.Second,
SerializeType: protocol.MsgPack,
CompressType: protocol.None,
BackupLatency: 10 * time.Millisecond,
TCPKeepAlivePeriod: time.Minute,
}
// Breaker is a CircuitBreaker interface.
@@ -149,8 +150,12 @@ type Option struct {
SerializeType protocol.SerializeType
CompressType protocol.CompressType
// send heartbeat message to service and check responses
Heartbeat bool
HeartbeatInterval time.Duration
// TCPKeepAlive, if it is zero we don't set keepalive
TCPKeepAlivePeriod time.Duration
}
// Call represents an active RPC.

View File

@@ -46,6 +46,11 @@ func (c *Client) Connect(network, address string) error {
}
if err == nil && conn != nil {
if tc, ok := conn.(*net.TCPConn); ok && c.option.TCPKeepAlivePeriod > 0 {
_ = tc.SetKeepAlive(true)
_ = tc.SetKeepAlivePeriod(c.option.TCPKeepAlivePeriod)
}
if c.option.IdleTimeout != 0 {
_ = conn.SetDeadline(time.Now().Add(c.option.IdleTimeout))
}
@@ -94,11 +99,6 @@ func newDirectConn(c *Client, network, address string) (net.Conn, error) {
return nil, err
}
if tc, ok := conn.(*net.TCPConn); ok {
_ = tc.SetKeepAlive(true)
_ = tc.SetKeepAlivePeriod(3 * time.Minute)
}
return conn, nil
}

10
server/options.go Normal file
View File

@@ -0,0 +1,10 @@
package server
import "time"
// WithTCPKeepAlivePeriod sets tcp keepalive period.
func WithTCPKeepAlivePeriod(period time.Duration) OptionFn {
return func(s *Server) {
s.options["TCPKeepAlivePeriod"] = period
}
}

View File

@@ -112,6 +112,9 @@ func NewServer(options ...OptionFn) *Server {
op(s)
}
if s.options["TCPKeepAlivePeriod"] == nil {
s.options["TCPKeepAlivePeriod"] = 3 * time.Minute
}
return s
}
@@ -277,9 +280,12 @@ func (s *Server) serveListener(ln net.Listener) error {
tempDelay = 0
if tc, ok := conn.(*net.TCPConn); ok {
tc.SetKeepAlive(true)
tc.SetKeepAlivePeriod(3 * time.Minute)
tc.SetLinger(10)
period := s.options["TCPKeepAlivePeriod"]
if period != nil {
tc.SetKeepAlive(true)
tc.SetKeepAlivePeriod(period.(time.Duration))
tc.SetLinger(10)
}
}
conn, ok := s.Plugins.DoPostConnAccept(conn)