mirror of
https://github.com/HDT3213/godis.git
synced 2025-10-05 16:57:06 +08:00
refine some code
This commit is contained in:
1
go.mod
1
go.mod
@@ -3,7 +3,6 @@ module github.com/HDT3213/godis
|
||||
go 1.12
|
||||
|
||||
require (
|
||||
github.com/jolestar/go-commons-pool v2.0.0+incompatible // indirect
|
||||
github.com/jolestar/go-commons-pool/v2 v2.1.1
|
||||
github.com/shopspring/decimal v1.2.0
|
||||
)
|
||||
|
8
go.sum
8
go.sum
@@ -1,15 +1,19 @@
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
|
||||
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
|
||||
github.com/jolestar/go-commons-pool v2.0.0+incompatible h1:uHn5uRKsLLQSf9f1J5QPY2xREWx/YH+e4bIIXcAuAaE=
|
||||
github.com/jolestar/go-commons-pool v2.0.0+incompatible/go.mod h1:ChJYIbIch0DMCSU6VU0t0xhPoWDR2mMFIQek3XWU0s8=
|
||||
github.com/jolestar/go-commons-pool/v2 v2.1.1 h1:KrbCEvx5KhwcHzLTWIE8SJJQL7zzNto5in+wnO9/gSA=
|
||||
github.com/jolestar/go-commons-pool/v2 v2.1.1/go.mod h1:kTOzcguO2zUoEd+BySdg7Xhk/YE0HEr2bAHdWDkhMXg=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ=
|
||||
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
|
||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.5 h1:ymVxjfMaHvXD8RqPRmzHHsB3VvucivSkIAvJFDI5O3c=
|
||||
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
|
@@ -34,6 +34,7 @@ type Request struct {
|
||||
reply redis.Reply
|
||||
heartbeat bool
|
||||
waiting *wait.Wait
|
||||
err error
|
||||
}
|
||||
|
||||
const (
|
||||
@@ -69,15 +70,15 @@ func (client *Client) Start() {
|
||||
}
|
||||
|
||||
func (client *Client) Close() {
|
||||
// send stop signal
|
||||
client.cancelFunc()
|
||||
// stop new request
|
||||
close(client.sendingReqs)
|
||||
|
||||
// wait stop process
|
||||
client.writing.Wait()
|
||||
|
||||
// clean
|
||||
client.cancelFunc()
|
||||
_ = client.conn.Close()
|
||||
close(client.sendingReqs)
|
||||
close(client.waitingReqs)
|
||||
}
|
||||
|
||||
@@ -120,17 +121,16 @@ loop:
|
||||
}
|
||||
|
||||
func (client *Client) handleWrite() {
|
||||
client.writing.Add(1)
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case req := <-client.sendingReqs:
|
||||
client.writing.Add(1)
|
||||
client.doRequest(req)
|
||||
case <-client.ctx.Done():
|
||||
break loop
|
||||
}
|
||||
}
|
||||
client.writing.Done()
|
||||
}
|
||||
|
||||
// todo: wait with timeout
|
||||
@@ -146,6 +146,9 @@ func (client *Client) Send(args [][]byte) redis.Reply {
|
||||
if timeout {
|
||||
return reply.MakeErrReply("server time out")
|
||||
}
|
||||
if request.err != nil {
|
||||
return reply.MakeErrReply("request failed")
|
||||
}
|
||||
return request.reply
|
||||
}
|
||||
|
||||
@@ -162,6 +165,10 @@ func (client *Client) doRequest(req *Request) {
|
||||
}
|
||||
if err == nil {
|
||||
client.waitingReqs <- req
|
||||
} else {
|
||||
req.err = err
|
||||
req.waiting.Done()
|
||||
client.writing.Done()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -171,6 +178,7 @@ func (client *Client) finishRequest(reply redis.Reply) {
|
||||
if request.waiting != nil {
|
||||
request.waiting.Done()
|
||||
}
|
||||
client.writing.Done()
|
||||
}
|
||||
|
||||
func (client *Client) handleRead() error {
|
||||
@@ -294,16 +302,14 @@ func (client *Client) handleRead() error {
|
||||
if receivedCount == expectedArgsCount {
|
||||
downloading = false // finish downloading progress
|
||||
|
||||
request := <-client.waitingReqs
|
||||
if msgType == '*' {
|
||||
request.reply = reply.MakeMultiBulkReply(args)
|
||||
reply := reply.MakeMultiBulkReply(args)
|
||||
client.finishRequest(reply)
|
||||
} else if msgType == '$' {
|
||||
request.reply = reply.MakeBulkReply(args[0])
|
||||
reply := reply.MakeBulkReply(args[0])
|
||||
client.finishRequest(reply)
|
||||
}
|
||||
|
||||
if request.waiting != nil {
|
||||
request.waiting.Done()
|
||||
}
|
||||
|
||||
// finish reply
|
||||
expectedArgsCount = 0
|
||||
|
Reference in New Issue
Block a user