From cc6475f6a88bcd19bcdb3d8765009255713e48db Mon Sep 17 00:00:00 2001 From: hdt3213 Date: Sat, 21 Nov 2020 22:35:50 +0800 Subject: [PATCH] refine some code --- go.mod | 1 - go.sum | 8 ++++++-- src/redis/client/client.go | 28 +++++++++++++++++----------- 3 files changed, 23 insertions(+), 14 deletions(-) diff --git a/go.mod b/go.mod index 148a03b..161004c 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module github.com/HDT3213/godis go 1.12 require ( - github.com/jolestar/go-commons-pool v2.0.0+incompatible // indirect github.com/jolestar/go-commons-pool/v2 v2.1.1 github.com/shopspring/decimal v1.2.0 ) diff --git a/go.sum b/go.sum index 96dfe07..63a3969 100644 --- a/go.sum +++ b/go.sum @@ -1,15 +1,19 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= -github.com/jolestar/go-commons-pool v2.0.0+incompatible h1:uHn5uRKsLLQSf9f1J5QPY2xREWx/YH+e4bIIXcAuAaE= -github.com/jolestar/go-commons-pool v2.0.0+incompatible/go.mod h1:ChJYIbIch0DMCSU6VU0t0xhPoWDR2mMFIQek3XWU0s8= github.com/jolestar/go-commons-pool/v2 v2.1.1 h1:KrbCEvx5KhwcHzLTWIE8SJJQL7zzNto5in+wnO9/gSA= github.com/jolestar/go-commons-pool/v2 v2.1.1/go.mod h1:kTOzcguO2zUoEd+BySdg7Xhk/YE0HEr2bAHdWDkhMXg= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ= github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.5 h1:ymVxjfMaHvXD8RqPRmzHHsB3VvucivSkIAvJFDI5O3c= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/src/redis/client/client.go b/src/redis/client/client.go index 3a77d2c..f997787 100644 --- a/src/redis/client/client.go +++ b/src/redis/client/client.go @@ -34,6 +34,7 @@ type Request struct { reply redis.Reply heartbeat bool waiting *wait.Wait + err error } const ( @@ -69,15 +70,15 @@ func (client *Client) Start() { } func (client *Client) Close() { - // send stop signal - client.cancelFunc() + // stop new request + close(client.sendingReqs) // wait stop process client.writing.Wait() // clean + client.cancelFunc() _ = client.conn.Close() - close(client.sendingReqs) close(client.waitingReqs) } @@ -120,17 +121,16 @@ loop: } func (client *Client) handleWrite() { - client.writing.Add(1) loop: for { select { case req := <-client.sendingReqs: + client.writing.Add(1) client.doRequest(req) case <-client.ctx.Done(): break loop } } - client.writing.Done() } // todo: wait with timeout @@ -146,6 +146,9 @@ func (client *Client) Send(args [][]byte) redis.Reply { if timeout { return reply.MakeErrReply("server time out") } + if request.err != nil { + return reply.MakeErrReply("request failed") + } return request.reply } @@ -162,6 +165,10 @@ func (client *Client) doRequest(req *Request) { } if err == nil { client.waitingReqs <- req + } else { + req.err = err + req.waiting.Done() + client.writing.Done() } } @@ -171,6 +178,7 @@ func (client *Client) finishRequest(reply redis.Reply) { if request.waiting != nil { request.waiting.Done() } + client.writing.Done() } func (client *Client) handleRead() error { @@ -294,16 +302,14 @@ func (client *Client) handleRead() error { if receivedCount == expectedArgsCount { downloading = false // finish downloading progress - request := <-client.waitingReqs if msgType == '*' { - request.reply = reply.MakeMultiBulkReply(args) + reply := reply.MakeMultiBulkReply(args) + client.finishRequest(reply) } else if msgType == '$' { - request.reply = reply.MakeBulkReply(args[0]) + reply := reply.MakeBulkReply(args[0]) + client.finishRequest(reply) } - if request.waiting != nil { - request.waiting.Done() - } // finish reply expectedArgsCount = 0