From 67a9eae62fb29610b3b51fd27db0c3c62593aeef Mon Sep 17 00:00:00 2001 From: finley Date: Mon, 14 Nov 2022 22:41:43 +0800 Subject: [PATCH] use self-developed connection pool --- cluster/client_pool.go | 51 ------------- cluster/cluster.go | 53 +++++++++---- cluster/com.go | 16 ++-- go.mod | 2 - go.sum | 21 ----- lib/pool/pool.go | 131 ++++++++++++++++++++++++++++++++ lib/pool/pool_test.go | 169 +++++++++++++++++++++++++++++++++++++++++ 7 files changed, 347 insertions(+), 96 deletions(-) delete mode 100644 cluster/client_pool.go create mode 100644 lib/pool/pool.go create mode 100644 lib/pool/pool_test.go diff --git a/cluster/client_pool.go b/cluster/client_pool.go deleted file mode 100644 index 826c2b3..0000000 --- a/cluster/client_pool.go +++ /dev/null @@ -1,51 +0,0 @@ -package cluster - -import ( - "context" - "errors" - "github.com/hdt3213/godis/config" - "github.com/hdt3213/godis/lib/utils" - "github.com/hdt3213/godis/redis/client" - "github.com/jolestar/go-commons-pool/v2" -) - -type connectionFactory struct { - Peer string -} - -func (f *connectionFactory) MakeObject(ctx context.Context) (*pool.PooledObject, error) { - c, err := client.MakeClient(f.Peer) - if err != nil { - return nil, err - } - c.Start() - // all peers of cluster should use the same password - if config.Properties.RequirePass != "" { - c.Send(utils.ToCmdLine("AUTH", config.Properties.RequirePass)) - } - return pool.NewPooledObject(c), nil -} - -func (f *connectionFactory) DestroyObject(ctx context.Context, object *pool.PooledObject) error { - c, ok := object.Object.(*client.Client) - if !ok { - return errors.New("type mismatch") - } - c.Close() - return nil -} - -func (f *connectionFactory) ValidateObject(ctx context.Context, object *pool.PooledObject) bool { - // do validate - return true -} - -func (f *connectionFactory) ActivateObject(ctx context.Context, object *pool.PooledObject) error { - // do activate - return nil -} - -func (f *connectionFactory) PassivateObject(ctx context.Context, object *pool.PooledObject) error { - // do passivate - return nil -} diff --git a/cluster/cluster.go b/cluster/cluster.go index 8b5aeae..f16a2c8 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -2,7 +2,6 @@ package cluster import ( - "context" "fmt" "github.com/hdt3213/godis/config" database2 "github.com/hdt3213/godis/database" @@ -12,8 +11,10 @@ import ( "github.com/hdt3213/godis/lib/consistenthash" "github.com/hdt3213/godis/lib/idgenerator" "github.com/hdt3213/godis/lib/logger" + "github.com/hdt3213/godis/lib/pool" + "github.com/hdt3213/godis/lib/utils" + "github.com/hdt3213/godis/redis/client" "github.com/hdt3213/godis/redis/protocol" - "github.com/jolestar/go-commons-pool/v2" "runtime/debug" "strings" ) @@ -28,9 +29,9 @@ type PeerPicker interface { type Cluster struct { self string - nodes []string - peerPicker PeerPicker - peerConnection map[string]*pool.ObjectPool + nodes []string + peerPicker PeerPicker + nodeConnections map[string]*pool.Pool db database.EmbedDB transactions *dict.SimpleDict // id -> Transaction @@ -52,10 +53,10 @@ func MakeCluster() *Cluster { cluster := &Cluster{ self: config.Properties.Self, - db: database2.NewStandaloneServer(), - transactions: dict.MakeSimple(), - peerPicker: consistenthash.New(replicas, nil), - peerConnection: make(map[string]*pool.ObjectPool), + db: database2.NewStandaloneServer(), + transactions: dict.MakeSimple(), + peerPicker: consistenthash.New(replicas, nil), + nodeConnections: make(map[string]*pool.Pool), idGenerator: idgenerator.MakeGenerator(config.Properties.Self), relayImpl: defaultRelayImpl, @@ -71,11 +72,32 @@ func MakeCluster() *Cluster { } nodes = append(nodes, config.Properties.Self) cluster.peerPicker.AddNode(nodes...) - ctx := context.Background() - for _, peer := range config.Properties.Peers { - cluster.peerConnection[peer] = pool.NewObjectPoolWithDefaultConfig(ctx, &connectionFactory{ - Peer: peer, - }) + connectionPoolConfig := pool.Config{ + MaxIdle: 1, + MaxActive: 16, + } + for _, p := range config.Properties.Peers { + peer := p + factory := func() (interface{}, error) { + c, err := client.MakeClient(peer) + if err != nil { + return nil, err + } + c.Start() + // all peers of cluster should use the same password + if config.Properties.RequirePass != "" { + c.Send(utils.ToCmdLine("AUTH", config.Properties.RequirePass)) + } + return c, nil + } + finalizer := func(x interface{}) { + cli, ok := x.(client.Client) + if !ok { + return + } + cli.Close() + } + cluster.nodeConnections[peer] = pool.New(factory, finalizer, connectionPoolConfig) } cluster.nodes = nodes return cluster @@ -87,6 +109,9 @@ type CmdFunc func(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.R // Close stops current node of cluster func (cluster *Cluster) Close() { cluster.db.Close() + for _, pool := range cluster.nodeConnections { + pool.Close() + } } var router = makeRouter() diff --git a/cluster/com.go b/cluster/com.go index 652d62f..d41d9cc 100644 --- a/cluster/com.go +++ b/cluster/com.go @@ -1,7 +1,6 @@ package cluster import ( - "context" "errors" "github.com/hdt3213/godis/interface/redis" "github.com/hdt3213/godis/lib/utils" @@ -11,27 +10,28 @@ import ( ) func (cluster *Cluster) getPeerClient(peer string) (*client.Client, error) { - factory, ok := cluster.peerConnection[peer] + pool, ok := cluster.nodeConnections[peer] if !ok { - return nil, errors.New("connection factory not found") + return nil, errors.New("connection pool not found") } - raw, err := factory.BorrowObject(context.Background()) + raw, err := pool.Get() if err != nil { return nil, err } conn, ok := raw.(*client.Client) if !ok { - return nil, errors.New("connection factory make wrong type") + return nil, errors.New("connection pool make wrong type") } return conn, nil } func (cluster *Cluster) returnPeerClient(peer string, peerClient *client.Client) error { - connectionFactory, ok := cluster.peerConnection[peer] + pool, ok := cluster.nodeConnections[peer] if !ok { - return errors.New("connection factory not found") + return errors.New("connection pool not found") } - return connectionFactory.ReturnObject(context.Background(), peerClient) + pool.Put(peerClient) + return nil } var defaultRelayImpl = func(cluster *Cluster, node string, c redis.Connection, cmdLine CmdLine) redis.Reply { diff --git a/go.mod b/go.mod index 61e283a..2ad85f8 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,5 @@ go 1.16 require ( github.com/hdt3213/rdb v1.0.5 - github.com/jolestar/go-commons-pool/v2 v2.1.1 github.com/shopspring/decimal v1.2.0 - gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 1ba0377..7ce635b 100644 --- a/go.sum +++ b/go.sum @@ -1,27 +1,6 @@ -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o= github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= -github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= -github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= -github.com/hdt3213/rdb v1.0.2 h1:mPXShIqjuzgioBkwllj8XnlRQaPtbulNyuXeycxOMGs= -github.com/hdt3213/rdb v1.0.2/go.mod h1:m2CaP16oqYROIQMUUjB3WkqQWfDi/VebnHUDVRl4cIM= github.com/hdt3213/rdb v1.0.5 h1:toBvrixNWOlK26bHR1Amch/9+ioguL2jJT+uaMPYtJc= github.com/hdt3213/rdb v1.0.5/go.mod h1:dLJXf6wM7ZExH+PuEzbzUubTtkH61ilfAtPSSQgfs4w= -github.com/jolestar/go-commons-pool/v2 v2.1.1 h1:KrbCEvx5KhwcHzLTWIE8SJJQL7zzNto5in+wnO9/gSA= -github.com/jolestar/go-commons-pool/v2 v2.1.1/go.mod h1:kTOzcguO2zUoEd+BySdg7Xhk/YE0HEr2bAHdWDkhMXg= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ= github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= -github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.5 h1:ymVxjfMaHvXD8RqPRmzHHsB3VvucivSkIAvJFDI5O3c= -gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= -gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/lib/pool/pool.go b/lib/pool/pool.go new file mode 100644 index 0000000..52a8f42 --- /dev/null +++ b/lib/pool/pool.go @@ -0,0 +1,131 @@ +package pool + +import ( + "errors" + "sync" +) + +var ( + ErrClosed = errors.New("pool closed") + ErrMax = errors.New("reach max connection limit") +) + +type request chan interface{} + +type Config struct { + MaxIdle uint + MaxActive uint +} + +// Pool stores object for reusing, such as redis connection +type Pool struct { + Config + factory func() (interface{}, error) + finalizer func(x interface{}) + idles chan interface{} + waitingReqs []request + activeCount uint // increases during creating connection, decrease during destroying connection + mu sync.Mutex + closed bool +} + +func New(factory func() (interface{}, error), finalizer func(x interface{}), cfg Config) *Pool { + return &Pool{ + factory: factory, + finalizer: finalizer, + idles: make(chan interface{}, cfg.MaxIdle), + waitingReqs: make([]request, 0), + Config: cfg, + } +} + +// getOnNoIdle try to create a new connection or waiting for connection being returned +// invoker should have pool.mu +func (pool *Pool) getOnNoIdle() (interface{}, error) { + if pool.activeCount >= pool.MaxActive { + // waiting for connection being returned + req := make(chan interface{}, 1) + pool.waitingReqs = append(pool.waitingReqs, req) + pool.mu.Unlock() + x, ok := <-req + if !ok { + return nil, ErrMax + } + return x, nil + } + + // create a new connection + pool.activeCount++ // hold a place for new connection + pool.mu.Unlock() + x, err := pool.factory() + if err != nil { + // create failed return token + pool.mu.Lock() + pool.activeCount-- // release the holding place + pool.mu.Unlock() + return nil, err + } + return x, nil +} + +func (pool *Pool) Get() (interface{}, error) { + pool.mu.Lock() + if pool.closed { + pool.mu.Unlock() + return nil, ErrClosed + } + + select { + case item := <-pool.idles: + pool.mu.Unlock() + return item, nil + default: + // no pooled item, create one + return pool.getOnNoIdle() + } +} + +func (pool *Pool) Put(x interface{}) { + pool.mu.Lock() + + if pool.closed { + pool.mu.Unlock() + pool.finalizer(x) + return + } + + if len(pool.waitingReqs) > 0 { + req := pool.waitingReqs[0] + copy(pool.waitingReqs, pool.waitingReqs[1:]) + pool.waitingReqs = pool.waitingReqs[:len(pool.waitingReqs)-1] + req <- x + pool.mu.Unlock() + return + } + + select { + case pool.idles <- x: + pool.mu.Unlock() + return + default: + // reach max idle, destroy redundant item + pool.mu.Unlock() + pool.activeCount-- + pool.finalizer(x) + } +} + +func (pool *Pool) Close() { + pool.mu.Lock() + if pool.closed { + pool.mu.Unlock() + return + } + pool.closed = true + close(pool.idles) + pool.mu.Unlock() + + for x := range pool.idles { + pool.finalizer(x) + } +} diff --git a/lib/pool/pool_test.go b/lib/pool/pool_test.go new file mode 100644 index 0000000..c483c38 --- /dev/null +++ b/lib/pool/pool_test.go @@ -0,0 +1,169 @@ +package pool + +import ( + "errors" + "testing" + "time" +) + +type mockConn struct { + open bool +} + +func TestPool(t *testing.T) { + connNum := 0 + factory := func() (interface{}, error) { + connNum++ + return &mockConn{ + open: true, + }, nil + } + finalizer := func(x interface{}) { + connNum-- + c := x.(*mockConn) + c.open = false + } + cfg := Config{ + MaxIdle: 20, + MaxActive: 40, + } + pool := New(factory, finalizer, cfg) + var borrowed []*mockConn + for i := 0; i < int(cfg.MaxActive); i++ { + x, err := pool.Get() + if err != nil { + t.Error(err) + return + } + c := x.(*mockConn) + if !c.open { + t.Error("conn is not open") + return + } + borrowed = append(borrowed, c) + } + for _, c := range borrowed { + pool.Put(c) + } + borrowed = nil + // borrow returned + for i := 0; i < int(cfg.MaxActive); i++ { + x, err := pool.Get() + if err != nil { + t.Error(err) + return + } + c := x.(*mockConn) + if !c.open { + t.Error("conn is not open") + return + } + borrowed = append(borrowed, c) + } + for i, c := range borrowed { + if i < len(borrowed)-1 { + pool.Put(c) + } + } + pool.Close() + pool.Close() // test close twice + pool.Put(borrowed[len(borrowed)-1]) + if connNum != 0 { + t.Errorf("%d connections has not closed", connNum) + } + _, err := pool.Get() + if err != ErrClosed { + t.Error("expect err closed") + } +} + +func TestPool_Waiting(t *testing.T) { + factory := func() (interface{}, error) { + return &mockConn{ + open: true, + }, nil + } + finalizer := func(x interface{}) { + c := x.(*mockConn) + c.open = false + } + cfg := Config{ + MaxIdle: 2, + MaxActive: 4, + } + pool := New(factory, finalizer, cfg) + var borrowed []*mockConn + for i := 0; i < int(cfg.MaxActive); i++ { + x, err := pool.Get() + if err != nil { + t.Error(err) + return + } + c := x.(*mockConn) + if !c.open { + t.Error("conn is not open") + return + } + borrowed = append(borrowed, c) + } + getResult := make(chan bool, 0) + go func() { + x, err := pool.Get() + if err != nil { + t.Error(err) + getResult <- false + return + } + c := x.(*mockConn) + if !c.open { + t.Error("conn is not open") + getResult <- false + return + } + getResult <- true + }() + time.Sleep(time.Second) + pool.Put(borrowed[0]) + if ret := <-getResult; !ret { + t.Error("get and waiting returned failed") + } +} + +func TestPool_CreateErr(t *testing.T) { + makeErr := true + factory := func() (interface{}, error) { + if makeErr { + makeErr = false + return nil, errors.New("mock err") + } + return &mockConn{ + open: true, + }, nil + } + finalizer := func(x interface{}) { + c := x.(*mockConn) + c.open = false + } + cfg := Config{ + MaxIdle: 2, + MaxActive: 4, + } + pool := New(factory, finalizer, cfg) + _, err := pool.Get() + if err == nil { + t.Error("expecting err") + return + } + x, err := pool.Get() + if err != nil { + t.Error("get err") + return + } + pool.Put(x) + _, err = pool.Get() + if err != nil { + t.Error("get err") + return + } + +}