From 66172721c5ee33abd8bff6fcc6f4378256458145 Mon Sep 17 00:00:00 2001 From: hdt3213 Date: Sun, 9 May 2021 22:42:09 +0800 Subject: [PATCH] refine code --- cluster/client_pool.go | 12 +++--- cluster/cluster.go | 23 ++++++---- cluster/com.go | 28 ++++++------ cluster/com_test.go | 6 +-- cluster/del.go | 28 ++++++------ cluster/keys.go | 4 +- cluster/mset.go | 33 +++++++------- cluster/pubsub.go | 10 +++-- cluster/rename.go | 8 ++-- cluster/router.go | 14 +++--- cluster/transaction.go | 85 ++++++++++++++++++------------------- cluster/transaction_test.go | 22 +++++----- go.mod | 1 + go.sum | 14 ++++++ pubsub/pubsub.go | 1 + 15 files changed, 160 insertions(+), 129 deletions(-) diff --git a/cluster/client_pool.go b/cluster/client_pool.go index f99a538..ec2b754 100644 --- a/cluster/client_pool.go +++ b/cluster/client_pool.go @@ -9,11 +9,11 @@ import ( "github.com/jolestar/go-commons-pool/v2" ) -type ConnectionFactory struct { +type connectionFactory struct { Peer string } -func (f *ConnectionFactory) MakeObject(ctx context.Context) (*pool.PooledObject, error) { +func (f *connectionFactory) MakeObject(ctx context.Context) (*pool.PooledObject, error) { c, err := client.MakeClient(f.Peer) if err != nil { return nil, err @@ -26,7 +26,7 @@ func (f *ConnectionFactory) MakeObject(ctx context.Context) (*pool.PooledObject, return pool.NewPooledObject(c), nil } -func (f *ConnectionFactory) DestroyObject(ctx context.Context, object *pool.PooledObject) error { +func (f *connectionFactory) DestroyObject(ctx context.Context, object *pool.PooledObject) error { c, ok := object.Object.(*client.Client) if !ok { return errors.New("type mismatch") @@ -35,17 +35,17 @@ func (f *ConnectionFactory) DestroyObject(ctx context.Context, object *pool.Pool return nil } -func (f *ConnectionFactory) ValidateObject(ctx context.Context, object *pool.PooledObject) bool { +func (f *connectionFactory) ValidateObject(ctx context.Context, object *pool.PooledObject) bool { // do validate return true } -func (f *ConnectionFactory) ActivateObject(ctx context.Context, object *pool.PooledObject) error { +func (f *connectionFactory) ActivateObject(ctx context.Context, object *pool.PooledObject) error { // do activate return nil } -func (f *ConnectionFactory) PassivateObject(ctx context.Context, object *pool.PooledObject) error { +func (f *connectionFactory) PassivateObject(ctx context.Context, object *pool.PooledObject) error { // do passivate return nil } diff --git a/cluster/cluster.go b/cluster/cluster.go index fca295d..0b3e7fb 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -1,3 +1,4 @@ +// Package cluster provides a server side cluster which is transparent to client. You can connect to any node in the cluster to access all data in the cluster package cluster import ( @@ -16,6 +17,8 @@ import ( "strings" ) +// Cluster represents a node of godis cluster +// it holds part of data and coordinates other nodes to finish transactions type Cluster struct { self string @@ -37,7 +40,7 @@ const ( // if only one node involved in a transaction, just execute the command don't apply tcc procedure var allowFastTransaction = true -// start current processing as a node of cluster +// MakeCluster creates and starts a node of cluster func MakeCluster() *Cluster { cluster := &Cluster{ self: config.Properties.Self, @@ -62,7 +65,7 @@ func MakeCluster() *Cluster { cluster.peerPicker.AddNode(nodes...) ctx := context.Background() for _, peer := range config.Properties.Peers { - cluster.peerConnection[peer] = pool.NewObjectPoolWithDefaultConfig(ctx, &ConnectionFactory{ + cluster.peerConnection[peer] = pool.NewObjectPoolWithDefaultConfig(ctx, &connectionFactory{ Peer: peer, }) } @@ -73,11 +76,12 @@ func MakeCluster() *Cluster { // CmdFunc represents the handler of a redis command type CmdFunc func(cluster *Cluster, c redis.Connection, cmdAndArgs [][]byte) redis.Reply +// Close stops current node of cluster func (cluster *Cluster) Close() { cluster.db.Close() } -var router = MakeRouter() +var router = makeRouter() func isAuthenticated(c redis.Connection) bool { if config.Properties.RequirePass == "" { @@ -86,16 +90,17 @@ func isAuthenticated(c redis.Connection) bool { return c.GetPassword() == config.Properties.RequirePass } -func (cluster *Cluster) Exec(c redis.Connection, args [][]byte) (result redis.Reply) { +// Exec executes command on cluster +func (cluster *Cluster) Exec(c redis.Connection, cmdArgs [][]byte) (result redis.Reply) { defer func() { if err := recover(); err != nil { logger.Warn(fmt.Sprintf("error occurs: %v\n%s", err, string(debug.Stack()))) result = &reply.UnknownErrReply{} } }() - cmd := strings.ToLower(string(args[0])) + cmd := strings.ToLower(string(cmdArgs[0])) if cmd == "auth" { - return godis.Auth(cluster.db, c, args[1:]) + return godis.Auth(cluster.db, c, cmdArgs[1:]) } if !isAuthenticated(c) { return reply.MakeErrReply("NOAUTH Authentication required") @@ -104,14 +109,16 @@ func (cluster *Cluster) Exec(c redis.Connection, args [][]byte) (result redis.Re if !ok { return reply.MakeErrReply("ERR unknown command '" + cmd + "', or not supported in cluster mode") } - result = cmdFunc(cluster, c, args) + result = cmdFunc(cluster, c, cmdArgs) return } +// AfterClientClose does some clean after client close connection func (cluster *Cluster) AfterClientClose(c redis.Connection) { + cluster.db.AfterClientClose(c) } -func Ping(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { +func ping(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { return godis.Ping(cluster.db, args[1:]) } diff --git a/cluster/com.go b/cluster/com.go index 6d658b7..70aeb25 100644 --- a/cluster/com.go +++ b/cluster/com.go @@ -1,4 +1,3 @@ -// communicate with peers within cluster package cluster import ( @@ -33,29 +32,28 @@ func (cluster *Cluster) returnPeerClient(peer string, peerClient *client.Client) return connectionFactory.ReturnObject(context.Background(), peerClient) } -// relay command to peer +// relay relays command to peer // cannot call Prepare, Commit, Rollback of self node -func (cluster *Cluster) Relay(peer string, c redis.Connection, args [][]byte) redis.Reply { +func (cluster *Cluster) relay(peer string, c redis.Connection, args [][]byte) redis.Reply { if peer == cluster.self { // to self db return cluster.db.Exec(c, args) - } else { - peerClient, err := cluster.getPeerClient(peer) - if err != nil { - return reply.MakeErrReply(err.Error()) - } - defer func() { - _ = cluster.returnPeerClient(peer, peerClient) - }() - return peerClient.Send(args) } + peerClient, err := cluster.getPeerClient(peer) + if err != nil { + return reply.MakeErrReply(err.Error()) + } + defer func() { + _ = cluster.returnPeerClient(peer, peerClient) + }() + return peerClient.Send(args) } -// broadcast command to all node in cluster -func (cluster *Cluster) Broadcast(c redis.Connection, args [][]byte) map[string]redis.Reply { +// broadcast broadcasts command to all node in cluster +func (cluster *Cluster) broadcast(c redis.Connection, args [][]byte) map[string]redis.Reply { result := make(map[string]redis.Reply) for _, node := range cluster.nodes { - reply := cluster.Relay(node, c, args) + reply := cluster.relay(node, c, args) result[node] = reply } return result diff --git a/cluster/com_test.go b/cluster/com_test.go index df4daed..ebffe65 100644 --- a/cluster/com_test.go +++ b/cluster/com_test.go @@ -38,9 +38,9 @@ func TestRelay(t *testing.T) { testCluster2 := MakeTestCluster([]string{"127.0.0.1:6379"}) key := RandString(4) value := RandString(4) - ret := testCluster2.Relay("127.0.0.1:6379", nil, toArgs("SET", key, value)) + ret := testCluster2.relay("127.0.0.1:6379", nil, toArgs("SET", key, value)) asserts.AssertNotError(t, ret) - ret = testCluster2.Relay("127.0.0.1:6379", nil, toArgs("GET", key)) + ret = testCluster2.relay("127.0.0.1:6379", nil, toArgs("GET", key)) asserts.AssertBulkReply(t, ret, value) } @@ -48,7 +48,7 @@ func TestBroadcast(t *testing.T) { testCluster2 := MakeTestCluster([]string{"127.0.0.1:6379"}) key := RandString(4) value := RandString(4) - rets := testCluster2.Broadcast(nil, toArgs("SET", key, value)) + rets := testCluster2.broadcast(nil, toArgs("SET", key, value)) for _, v := range rets { asserts.AssertNotError(t, v) } diff --git a/cluster/del.go b/cluster/del.go index 96514cb..0a2fe7b 100644 --- a/cluster/del.go +++ b/cluster/del.go @@ -6,6 +6,8 @@ import ( "strconv" ) +// Del atomically removes given keys from cluster, keys can be distributed on any node +// if the given keys are distributed on different node, Del will use try-commit-catch to remove them func Del(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { if len(args) < 2 { return reply.MakeErrReply("ERR wrong number of arguments for 'del' command") @@ -17,22 +19,22 @@ func Del(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { groupMap := cluster.groupBy(keys) if len(groupMap) == 1 && allowFastTransaction { // do fast for peer, group := range groupMap { // only one group - return cluster.Relay(peer, c, makeArgs("DEL", group...)) + return cluster.relay(peer, c, makeArgs("DEL", group...)) } } // prepare var errReply redis.Reply - txId := cluster.idGenerator.NextId() - txIdStr := strconv.FormatInt(txId, 10) + txID := cluster.idGenerator.NextId() + txIDStr := strconv.FormatInt(txID, 10) rollback := false for peer, group := range groupMap { - args := []string{txIdStr} + args := []string{txIDStr} args = append(args, group...) var resp redis.Reply if peer == cluster.self { - resp = PrepareDel(cluster, c, makeArgs("PrepareDel", args...)) + resp = prepareDel(cluster, c, makeArgs("PrepareDel", args...)) } else { - resp = cluster.Relay(peer, c, makeArgs("PrepareDel", args...)) + resp = cluster.relay(peer, c, makeArgs("PrepareDel", args...)) } if reply.IsErrorReply(resp) { errReply = resp @@ -43,10 +45,10 @@ func Del(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { var respList []redis.Reply if rollback { // rollback - RequestRollback(cluster, c, txId, groupMap) + requestRollback(cluster, c, txID, groupMap) } else { // commit - respList, errReply = RequestCommit(cluster, c, txId, groupMap) + respList, errReply = requestCommit(cluster, c, txID, groupMap) if errReply != nil { rollback = true } @@ -63,19 +65,19 @@ func Del(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { } // args: PrepareDel id keys... -func PrepareDel(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { +func prepareDel(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { if len(args) < 3 { return reply.MakeErrReply("ERR wrong number of arguments for 'preparedel' command") } - txId := string(args[1]) + txID := string(args[1]) keys := make([]string, 0, len(args)-2) for i := 2; i < len(args); i++ { arg := args[i] keys = append(keys, string(arg)) } txArgs := makeArgs("DEL", keys...) // actual args for cluster.db - tx := NewTransaction(cluster, c, txId, txArgs, keys) - cluster.transactions.Put(txId, tx) + tx := NewTransaction(cluster, c, txID, txArgs, keys) + cluster.transactions.Put(txID, tx) err := tx.prepare() if err != nil { return reply.MakeErrReply(err.Error()) @@ -84,7 +86,7 @@ func PrepareDel(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply } // invoker should provide lock -func CommitDel(cluster *Cluster, c redis.Connection, tx *Transaction) redis.Reply { +func commitDel(cluster *Cluster, c redis.Connection, tx *Transaction) redis.Reply { keys := make([]string, len(tx.args)) for i, v := range tx.args { keys[i] = string(v) diff --git a/cluster/keys.go b/cluster/keys.go index 403473f..a1c8a61 100644 --- a/cluster/keys.go +++ b/cluster/keys.go @@ -5,8 +5,9 @@ import ( "github.com/hdt3213/godis/redis/reply" ) +// FlushDB removes all data in current database func FlushDB(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { - replies := cluster.Broadcast(c, args) + replies := cluster.broadcast(c, args) var errReply reply.ErrorReply for _, v := range replies { if reply.IsErrorReply(v) { @@ -20,6 +21,7 @@ func FlushDB(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { return reply.MakeErrReply("error occurs: " + errReply.Error()) } +// FlushAll removes all data in cluster func FlushAll(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { return FlushDB(cluster, c, args) } diff --git a/cluster/mset.go b/cluster/mset.go index 9c60e5f..076b8cd 100644 --- a/cluster/mset.go +++ b/cluster/mset.go @@ -8,6 +8,7 @@ import ( "strconv" ) +// MGet atomically get multi key-value from cluster, keys can be distributed on any node func MGet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { if len(args) < 2 { return reply.MakeErrReply("ERR wrong number of arguments for 'mget' command") @@ -20,7 +21,7 @@ func MGet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { resultMap := make(map[string][]byte) groupMap := cluster.groupBy(keys) for peer, group := range groupMap { - resp := cluster.Relay(peer, c, makeArgs("MGET", group...)) + resp := cluster.relay(peer, c, makeArgs("MGET", group...)) if reply.IsErrorReply(resp) { errReply := resp.(reply.ErrorReply) return reply.MakeErrReply(fmt.Sprintf("ERR during get %s occurs: %v", group[0], errReply.Error())) @@ -39,11 +40,11 @@ func MGet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { } // args: PrepareMSet id keys... -func PrepareMSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { +func prepareMSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { if len(args) < 3 { return reply.MakeErrReply("ERR wrong number of arguments for 'preparemset' command") } - txId := string(args[1]) + txID := string(args[1]) size := (len(args) - 2) / 2 keys := make([]string, size) for i := 0; i < size; i++ { @@ -54,8 +55,8 @@ func PrepareMSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Repl []byte("MSet"), } // actual args for cluster.db txArgs = append(txArgs, args[2:]...) - tx := NewTransaction(cluster, c, txId, txArgs, keys) - cluster.transactions.Put(txId, tx) + tx := NewTransaction(cluster, c, txID, txArgs, keys) + cluster.transactions.Put(txID, tx) err := tx.prepare() if err != nil { return reply.MakeErrReply(err.Error()) @@ -64,7 +65,7 @@ func PrepareMSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Repl } // invoker should provide lock -func CommitMSet(cluster *Cluster, c redis.Connection, tx *Transaction) redis.Reply { +func commitMSet(cluster *Cluster, c redis.Connection, tx *Transaction) redis.Reply { size := len(tx.args) / 2 keys := make([]string, size) values := make([][]byte, size) @@ -80,6 +81,7 @@ func CommitMSet(cluster *Cluster, c redis.Connection, tx *Transaction) redis.Rep return &reply.OkReply{} } +// MSet atomically sets multi key-value in cluster, keys can be distributed on any node func MSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { argCount := len(args) - 1 if argCount%2 != 0 || argCount < 1 { @@ -97,25 +99,25 @@ func MSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { groupMap := cluster.groupBy(keys) if len(groupMap) == 1 && allowFastTransaction { // do fast for peer := range groupMap { - return cluster.Relay(peer, c, args) + return cluster.relay(peer, c, args) } } //prepare var errReply redis.Reply - txId := cluster.idGenerator.NextId() - txIdStr := strconv.FormatInt(txId, 10) + txID := cluster.idGenerator.NextId() + txIDStr := strconv.FormatInt(txID, 10) rollback := false for peer, group := range groupMap { - peerArgs := []string{txIdStr} + peerArgs := []string{txIDStr} for _, k := range group { peerArgs = append(peerArgs, k, valueMap[k]) } var resp redis.Reply if peer == cluster.self { - resp = PrepareMSet(cluster, c, makeArgs("PrepareMSet", peerArgs...)) + resp = prepareMSet(cluster, c, makeArgs("PrepareMSet", peerArgs...)) } else { - resp = cluster.Relay(peer, c, makeArgs("PrepareMSet", peerArgs...)) + resp = cluster.relay(peer, c, makeArgs("PrepareMSet", peerArgs...)) } if reply.IsErrorReply(resp) { errReply = resp @@ -125,9 +127,9 @@ func MSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { } if rollback { // rollback - RequestRollback(cluster, c, txId, groupMap) + requestRollback(cluster, c, txID, groupMap) } else { - _, errReply = RequestCommit(cluster, c, txId, groupMap) + _, errReply = requestCommit(cluster, c, txID, groupMap) rollback = errReply != nil } if !rollback { @@ -137,6 +139,7 @@ func MSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { } +// MSetNX sets multi key-value in database, only if none of the given keys exist and all given keys are on the same node func MSetNX(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { argCount := len(args) - 1 if argCount%2 != 0 || argCount < 1 { @@ -155,5 +158,5 @@ func MSetNX(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { } } } - return cluster.Relay(peer, c, args) + return cluster.relay(peer, c, args) } diff --git a/cluster/pubsub.go b/cluster/pubsub.go index f0e6265..5cdf511 100644 --- a/cluster/pubsub.go +++ b/cluster/pubsub.go @@ -16,10 +16,10 @@ var ( publishCmd = []byte(publish) ) -// broadcast msg to all peers in cluster when receive publish command from client +// Publish broadcasts msg to all peers in cluster when receive publish command from client func Publish(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { var count int64 = 0 - results := cluster.Broadcast(c, args) + results := cluster.broadcast(c, args) for _, val := range results { if errReply, ok := val.(reply.ErrorReply); ok { logger.Error("publish occurs error: " + errReply.Error()) @@ -30,16 +30,18 @@ func Publish(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { return reply.MakeIntReply(count) } -// receive publish command from peer, just publish to local subscribing clients, do not relay to peers -func OnRelayedPublish(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { +// onRelayedPublish receives publish command from peer, just publish to local subscribing clients, do not relay to peers +func onRelayedPublish(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { args[0] = publishCmd return cluster.db.Exec(c, args) // let local db.hub handle publish } +// Subscribe puts the given connection into the given channel func Subscribe(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { return cluster.db.Exec(c, args) // let local db.hub handle subscribe } +// UnSubscribe removes the given connection from the given channel func UnSubscribe(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { return cluster.db.Exec(c, args) // let local db.hub handle subscribe } diff --git a/cluster/rename.go b/cluster/rename.go index d91d8f0..ef40246 100644 --- a/cluster/rename.go +++ b/cluster/rename.go @@ -5,7 +5,7 @@ import ( "github.com/hdt3213/godis/redis/reply" ) -// Rename renames a key, the origin and the destination must within the same slot +// Rename renames a key, the origin and the destination must within the same node func Rename(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { if len(args) != 3 { return reply.MakeErrReply("ERR wrong number of arguments for 'rename' command") @@ -19,9 +19,11 @@ func Rename(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { if srcPeer != destPeer { return reply.MakeErrReply("ERR rename must within one slot in cluster mode") } - return cluster.Relay(srcPeer, c, args) + return cluster.relay(srcPeer, c, args) } +// RenameNx renames a key, only if the new key does not exist. +// The origin and the destination must within the same node func RenameNx(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { if len(args) != 3 { return reply.MakeErrReply("ERR wrong number of arguments for 'renamenx' command") @@ -35,5 +37,5 @@ func RenameNx(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { if srcPeer != destPeer { return reply.MakeErrReply("ERR rename must within one slot in cluster mode") } - return cluster.Relay(srcPeer, c, args) + return cluster.relay(srcPeer, c, args) } diff --git a/cluster/router.go b/cluster/router.go index 7d4fe2e..7d06d8d 100644 --- a/cluster/router.go +++ b/cluster/router.go @@ -2,15 +2,15 @@ package cluster import "github.com/hdt3213/godis/interface/redis" -func MakeRouter() map[string]CmdFunc { +func makeRouter() map[string]CmdFunc { routerMap := make(map[string]CmdFunc) - routerMap["ping"] = Ping + routerMap["ping"] = ping - routerMap["commit"] = Commit + routerMap["commit"] = commit routerMap["rollback"] = Rollback routerMap["del"] = Del - routerMap["preparedel"] = PrepareDel - routerMap["preparemset"] = PrepareMSet + routerMap["preparedel"] = prepareDel + routerMap["preparemset"] = prepareMSet routerMap["expire"] = defaultFunc routerMap["expireat"] = defaultFunc @@ -102,7 +102,7 @@ func MakeRouter() map[string]CmdFunc { routerMap["georadiusbymember"] = defaultFunc routerMap["publish"] = Publish - routerMap[relayPublish] = OnRelayedPublish + routerMap[relayPublish] = onRelayedPublish routerMap["subscribe"] = Subscribe routerMap["unsubscribe"] = UnSubscribe @@ -117,5 +117,5 @@ func MakeRouter() map[string]CmdFunc { func defaultFunc(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { key := string(args[1]) peer := cluster.peerPicker.PickNode(key) - return cluster.Relay(peer, c, args) + return cluster.relay(peer, c, args) } diff --git a/cluster/transaction.go b/cluster/transaction.go index c69fb84..1ebff29 100644 --- a/cluster/transaction.go +++ b/cluster/transaction.go @@ -1,7 +1,6 @@ package cluster import ( - "errors" "fmt" "github.com/hdt3213/godis" "github.com/hdt3213/godis/interface/redis" @@ -14,6 +13,7 @@ import ( "time" ) +// Transaction stores state and data for a try-commit-catch distributed transaction type Transaction struct { id string // transaction id args [][]byte // cmd args @@ -32,16 +32,17 @@ const ( maxLockTime = 3 * time.Second waitBeforeCleanTx = 2 * maxLockTime - CreatedStatus = 0 - PreparedStatus = 1 - CommittedStatus = 2 - RolledBackStatus = 3 + createdStatus = 0 + preparedStatus = 1 + committedStatus = 2 + rolledBackStatus = 3 ) -func genTaskKey(txId string) string { - return "tx:" + txId +func genTaskKey(txID string) string { + return "tx:" + txID } +// NewTransaction creates a try-commit-catch distributed transaction func NewTransaction(cluster *Cluster, c redis.Connection, id string, args [][]byte, keys []string) *Transaction { return &Transaction{ id: id, @@ -49,7 +50,7 @@ func NewTransaction(cluster *Cluster, c redis.Connection, id string, args [][]by cluster: cluster, conn: c, keys: keys, - status: CreatedStatus, + status: createdStatus, mu: new(sync.Mutex), } } @@ -88,10 +89,10 @@ func (tx *Transaction) prepare() error { tx.undoLog[key] = nil // entity was nil, should be removed while rollback } } - tx.status = PreparedStatus + tx.status = preparedStatus taskKey := genTaskKey(tx.id) timewheel.Delay(maxLockTime, taskKey, func() { - if tx.status == PreparedStatus { // rollback transaction uncommitted until expire + if tx.status == preparedStatus { // rollback transaction uncommitted until expire logger.Info("abort transaction: " + tx.id) _ = tx.rollback() } @@ -105,9 +106,9 @@ func (tx *Transaction) rollback() error { defer tx.mu.Unlock() if tx.status != curStatus { // ensure status not changed by other goroutine - return errors.New(fmt.Sprintf("tx %s status changed", tx.id)) + return fmt.Errorf("tx %s status changed", tx.id) } - if tx.status == RolledBackStatus { // no need to rollback a rolled-back transaction + if tx.status == rolledBackStatus { // no need to rollback a rolled-back transaction return nil } tx.lockKeys() @@ -120,17 +121,17 @@ func (tx *Transaction) rollback() error { } } tx.unLockKeys() - tx.status = RolledBackStatus + tx.status = rolledBackStatus return nil } -// rollback local transaction +// Rollback rollbacks local transaction func Rollback(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { if len(args) != 2 { return reply.MakeErrReply("ERR wrong number of arguments for 'rollback' command") } - txId := string(args[1]) - raw, ok := cluster.transactions.Get(txId) + txID := string(args[1]) + raw, ok := cluster.transactions.Get(txID) if !ok { return reply.MakeIntReply(0) } @@ -146,13 +147,13 @@ func Rollback(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { return reply.MakeIntReply(1) } -// commit local transaction as a worker -func Commit(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { +// commit commits local transaction as a worker when receive commit command from coordinator +func commit(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { if len(args) != 2 { return reply.MakeErrReply("ERR wrong number of arguments for 'commit' command") } - txId := string(args[1]) - raw, ok := cluster.transactions.Get(txId) + txID := string(args[1]) + raw, ok := cluster.transactions.Get(txID) if !ok { return reply.MakeIntReply(0) } @@ -164,40 +165,38 @@ func Commit(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { cmd := strings.ToLower(string(tx.args[0])) var result redis.Reply if cmd == "del" { - result = CommitDel(cluster, c, tx) + result = commitDel(cluster, c, tx) } else if cmd == "mset" { - result = CommitMSet(cluster, c, tx) + result = commitMSet(cluster, c, tx) } if reply.IsErrorReply(result) { // failed err2 := tx.rollback() return reply.MakeErrReply(fmt.Sprintf("err occurs when rollback: %v, origin err: %s", err2, result)) - } else { - // after committed - tx.unLockKeys() - tx.status = CommittedStatus - // clean transaction - // do not clean immediately, in case rollback - timewheel.Delay(waitBeforeCleanTx, "", func() { - cluster.transactions.Remove(tx.id) - }) } - + // after committed + tx.unLockKeys() + tx.status = committedStatus + // clean finished transaction + // do not clean immediately, in case rollback + timewheel.Delay(waitBeforeCleanTx, "", func() { + cluster.transactions.Remove(tx.id) + }) return result } -// request all node commit transaction as leader -func RequestCommit(cluster *Cluster, c redis.Connection, txId int64, peers map[string][]string) ([]redis.Reply, reply.ErrorReply) { +// requestCommit commands all node commit transaction as coordinator +func requestCommit(cluster *Cluster, c redis.Connection, txID int64, peers map[string][]string) ([]redis.Reply, reply.ErrorReply) { var errReply reply.ErrorReply - txIdStr := strconv.FormatInt(txId, 10) + txIDStr := strconv.FormatInt(txID, 10) respList := make([]redis.Reply, 0, len(peers)) for peer := range peers { var resp redis.Reply if peer == cluster.self { - resp = Commit(cluster, c, makeArgs("commit", txIdStr)) + resp = commit(cluster, c, makeArgs("commit", txIDStr)) } else { - resp = cluster.Relay(peer, c, makeArgs("commit", txIdStr)) + resp = cluster.relay(peer, c, makeArgs("commit", txIDStr)) } if reply.IsErrorReply(resp) { errReply = resp.(reply.ErrorReply) @@ -206,20 +205,20 @@ func RequestCommit(cluster *Cluster, c redis.Connection, txId int64, peers map[s respList = append(respList, resp) } if errReply != nil { - RequestRollback(cluster, c, txId, peers) + requestRollback(cluster, c, txID, peers) return nil, errReply } return respList, nil } -// request all node rollback transaction as leader -func RequestRollback(cluster *Cluster, c redis.Connection, txId int64, peers map[string][]string) { - txIdStr := strconv.FormatInt(txId, 10) +// requestRollback requests all node rollback transaction as coordinator +func requestRollback(cluster *Cluster, c redis.Connection, txID int64, peers map[string][]string) { + txIDStr := strconv.FormatInt(txID, 10) for peer := range peers { if peer == cluster.self { - Rollback(cluster, c, makeArgs("rollback", txIdStr)) + Rollback(cluster, c, makeArgs("rollback", txIDStr)) } else { - cluster.Relay(peer, c, makeArgs("rollback", txIdStr)) + cluster.relay(peer, c, makeArgs("rollback", txIDStr)) } } } diff --git a/cluster/transaction_test.go b/cluster/transaction_test.go index 2134e85..10fc5fb 100644 --- a/cluster/transaction_test.go +++ b/cluster/transaction_test.go @@ -10,36 +10,36 @@ import ( func TestRollback(t *testing.T) { // rollback uncommitted transaction FlushAll(testCluster, nil, toArgs("FLUSHALL")) - txId := rand.Int63() - txIdStr := strconv.FormatInt(txId, 10) + txID := rand.Int63() + txIDStr := strconv.FormatInt(txID, 10) keys := []string{"a", "b"} groupMap := testCluster.groupBy(keys) - args := []string{txIdStr} + args := []string{txIDStr} args = append(args, keys...) testCluster.Exec(nil, toArgs("SET", "a", "a")) - ret := PrepareDel(testCluster, nil, makeArgs("PrepareDel", args...)) + ret := prepareDel(testCluster, nil, makeArgs("PrepareDel", args...)) asserts.AssertNotError(t, ret) - RequestRollback(testCluster, nil, txId, groupMap) + requestRollback(testCluster, nil, txID, groupMap) ret = testCluster.Exec(nil, toArgs("GET", "a")) asserts.AssertBulkReply(t, ret, "a") // rollback committed transaction FlushAll(testCluster, nil, toArgs("FLUSHALL")) - txId = rand.Int63() - txIdStr = strconv.FormatInt(txId, 10) - args = []string{txIdStr} + txID = rand.Int63() + txIDStr = strconv.FormatInt(txID, 10) + args = []string{txIDStr} args = append(args, keys...) testCluster.Exec(nil, toArgs("SET", "a", "a")) - ret = PrepareDel(testCluster, nil, makeArgs("PrepareDel", args...)) + ret = prepareDel(testCluster, nil, makeArgs("PrepareDel", args...)) asserts.AssertNotError(t, ret) - _, err := RequestCommit(testCluster, nil, txId, groupMap) + _, err := requestCommit(testCluster, nil, txID, groupMap) if err != nil { t.Errorf("del failed %v", err) return } ret = testCluster.Exec(nil, toArgs("GET", "a")) asserts.AssertNullBulk(t, ret) - RequestRollback(testCluster, nil, txId, groupMap) + requestRollback(testCluster, nil, txID, groupMap) ret = testCluster.Exec(nil, toArgs("GET", "a")) asserts.AssertBulkReply(t, ret, "a") } diff --git a/go.mod b/go.mod index 374f8ea..0447b4f 100644 --- a/go.mod +++ b/go.mod @@ -6,4 +6,5 @@ require ( github.com/jolestar/go-commons-pool/v2 v2.1.1 github.com/shopspring/decimal v1.2.0 golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect + golang.org/x/tools v0.1.0 // indirect ) diff --git a/go.sum b/go.sum index 94cbceb..2d99042 100644 --- a/go.sum +++ b/go.sum @@ -13,20 +13,34 @@ github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug= golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7 h1:EBZoQjiKKPaLbPrbpssUfuHtwM6KV/vb4U85g/cigFY= golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY= +golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index 0465b2e..84648a1 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -65,6 +65,7 @@ func unsubscribe0(hub *Hub, channel string, client redis.Connection) bool { return false } +// Subscribe puts the given connection into the given channel func Subscribe(hub *Hub, c redis.Connection, args [][]byte) redis.Reply { channels := make([]string, len(args)) for i, b := range args {