bug fix: do not client during closing

This commit is contained in:
finley
2022-07-04 20:50:16 +08:00
parent 5c85295b02
commit 9e155a617c
2 changed files with 23 additions and 4 deletions

View File

@@ -11,9 +11,16 @@ import (
"runtime/debug" "runtime/debug"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
) )
const (
created = iota
running
closed
)
// Client is a pipeline mode redis client // Client is a pipeline mode redis client
type Client struct { type Client struct {
conn net.Conn conn net.Conn
@@ -22,6 +29,7 @@ type Client struct {
ticker *time.Ticker ticker *time.Ticker
addr string addr string
status int32
working *sync.WaitGroup // its counter presents unfinished requests(pending and waiting) working *sync.WaitGroup // its counter presents unfinished requests(pending and waiting)
} }
@@ -61,10 +69,12 @@ func (client *Client) Start() {
go client.handleWrite() go client.handleWrite()
go client.handleRead() go client.handleRead()
go client.heartbeat() go client.heartbeat()
atomic.StoreInt32(&client.status, running)
} }
// Close stops asynchronous goroutines and close connection // Close stops asynchronous goroutines and close connection
func (client *Client) Close() { func (client *Client) Close() {
atomic.StoreInt32(&client.status, closed)
client.ticker.Stop() client.ticker.Stop()
// stop new request // stop new request
close(client.pendingReqs) close(client.pendingReqs)
@@ -78,6 +88,7 @@ func (client *Client) Close() {
} }
func (client *Client) reconnect() { func (client *Client) reconnect() {
logger.Info("reconnect with: " + client.addr)
_ = client.conn.Close() // ignore possible errors from repeated closes _ = client.conn.Close() // ignore possible errors from repeated closes
var conn net.Conn var conn net.Conn
@@ -97,7 +108,7 @@ func (client *Client) reconnect() {
return return
} }
client.conn = conn client.conn = conn
//
close(client.waitingReqs) close(client.waitingReqs)
for req := range client.waitingReqs { for req := range client.waitingReqs {
req.err = errors.New("connection closed") req.err = errors.New("connection closed")
@@ -122,6 +133,9 @@ func (client *Client) handleWrite() {
// Send sends a request to redis server // Send sends a request to redis server
func (client *Client) Send(args [][]byte) redis.Reply { func (client *Client) Send(args [][]byte) redis.Reply {
if atomic.LoadInt32(&client.status) != running {
return protocol.MakeErrReply("client closed")
}
request := &request{ request := &request{
args: args, args: args,
heartbeat: false, heartbeat: false,
@@ -198,6 +212,10 @@ func (client *Client) handleRead() {
ch := parser.ParseStream(client.conn) ch := parser.ParseStream(client.conn)
for payload := range ch { for payload := range ch {
if payload.Err != nil { if payload.Err != nil {
status := atomic.LoadInt32(&client.status)
if status == closed {
return
}
client.reconnect() client.reconnect()
return return
} }

View File

@@ -3,7 +3,9 @@ package client
import ( import (
"bytes" "bytes"
"github.com/hdt3213/godis/lib/logger" "github.com/hdt3213/godis/lib/logger"
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/protocol" "github.com/hdt3213/godis/redis/protocol"
"github.com/hdt3213/godis/redis/protocol/asserts"
"strconv" "strconv"
"testing" "testing"
"time" "time"
@@ -105,6 +107,8 @@ func TestClient(t *testing.T) {
} }
client.Close() client.Close()
ret := client.Send(utils.ToCmdLine("ping"))
asserts.AssertErrReply(t, ret, "client closed")
} }
func TestReconnect(t *testing.T) { func TestReconnect(t *testing.T) {
@@ -135,7 +139,4 @@ func TestReconnect(t *testing.T) {
if !success { if !success {
t.Error("reconnect error") t.Error("reconnect error")
} }
//var wg sync.WaitGroup
//wg.Add(1)
//wg.Wait()
} }