mirror of
https://github.com/HDT3213/godis.git
synced 2025-10-22 16:29:42 +08:00
refine code
This commit is contained in:
@@ -9,11 +9,11 @@ import (
|
||||
"github.com/jolestar/go-commons-pool/v2"
|
||||
)
|
||||
|
||||
type ConnectionFactory struct {
|
||||
type connectionFactory struct {
|
||||
Peer string
|
||||
}
|
||||
|
||||
func (f *ConnectionFactory) MakeObject(ctx context.Context) (*pool.PooledObject, error) {
|
||||
func (f *connectionFactory) MakeObject(ctx context.Context) (*pool.PooledObject, error) {
|
||||
c, err := client.MakeClient(f.Peer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -26,7 +26,7 @@ func (f *ConnectionFactory) MakeObject(ctx context.Context) (*pool.PooledObject,
|
||||
return pool.NewPooledObject(c), nil
|
||||
}
|
||||
|
||||
func (f *ConnectionFactory) DestroyObject(ctx context.Context, object *pool.PooledObject) error {
|
||||
func (f *connectionFactory) DestroyObject(ctx context.Context, object *pool.PooledObject) error {
|
||||
c, ok := object.Object.(*client.Client)
|
||||
if !ok {
|
||||
return errors.New("type mismatch")
|
||||
@@ -35,17 +35,17 @@ func (f *ConnectionFactory) DestroyObject(ctx context.Context, object *pool.Pool
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *ConnectionFactory) ValidateObject(ctx context.Context, object *pool.PooledObject) bool {
|
||||
func (f *connectionFactory) ValidateObject(ctx context.Context, object *pool.PooledObject) bool {
|
||||
// do validate
|
||||
return true
|
||||
}
|
||||
|
||||
func (f *ConnectionFactory) ActivateObject(ctx context.Context, object *pool.PooledObject) error {
|
||||
func (f *connectionFactory) ActivateObject(ctx context.Context, object *pool.PooledObject) error {
|
||||
// do activate
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *ConnectionFactory) PassivateObject(ctx context.Context, object *pool.PooledObject) error {
|
||||
func (f *connectionFactory) PassivateObject(ctx context.Context, object *pool.PooledObject) error {
|
||||
// do passivate
|
||||
return nil
|
||||
}
|
||||
|
@@ -1,3 +1,4 @@
|
||||
// Package cluster provides a server side cluster which is transparent to client. You can connect to any node in the cluster to access all data in the cluster
|
||||
package cluster
|
||||
|
||||
import (
|
||||
@@ -16,6 +17,8 @@ import (
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Cluster represents a node of godis cluster
|
||||
// it holds part of data and coordinates other nodes to finish transactions
|
||||
type Cluster struct {
|
||||
self string
|
||||
|
||||
@@ -37,7 +40,7 @@ const (
|
||||
// if only one node involved in a transaction, just execute the command don't apply tcc procedure
|
||||
var allowFastTransaction = true
|
||||
|
||||
// start current processing as a node of cluster
|
||||
// MakeCluster creates and starts a node of cluster
|
||||
func MakeCluster() *Cluster {
|
||||
cluster := &Cluster{
|
||||
self: config.Properties.Self,
|
||||
@@ -62,7 +65,7 @@ func MakeCluster() *Cluster {
|
||||
cluster.peerPicker.AddNode(nodes...)
|
||||
ctx := context.Background()
|
||||
for _, peer := range config.Properties.Peers {
|
||||
cluster.peerConnection[peer] = pool.NewObjectPoolWithDefaultConfig(ctx, &ConnectionFactory{
|
||||
cluster.peerConnection[peer] = pool.NewObjectPoolWithDefaultConfig(ctx, &connectionFactory{
|
||||
Peer: peer,
|
||||
})
|
||||
}
|
||||
@@ -73,11 +76,12 @@ func MakeCluster() *Cluster {
|
||||
// CmdFunc represents the handler of a redis command
|
||||
type CmdFunc func(cluster *Cluster, c redis.Connection, cmdAndArgs [][]byte) redis.Reply
|
||||
|
||||
// Close stops current node of cluster
|
||||
func (cluster *Cluster) Close() {
|
||||
cluster.db.Close()
|
||||
}
|
||||
|
||||
var router = MakeRouter()
|
||||
var router = makeRouter()
|
||||
|
||||
func isAuthenticated(c redis.Connection) bool {
|
||||
if config.Properties.RequirePass == "" {
|
||||
@@ -86,16 +90,17 @@ func isAuthenticated(c redis.Connection) bool {
|
||||
return c.GetPassword() == config.Properties.RequirePass
|
||||
}
|
||||
|
||||
func (cluster *Cluster) Exec(c redis.Connection, args [][]byte) (result redis.Reply) {
|
||||
// Exec executes command on cluster
|
||||
func (cluster *Cluster) Exec(c redis.Connection, cmdArgs [][]byte) (result redis.Reply) {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
logger.Warn(fmt.Sprintf("error occurs: %v\n%s", err, string(debug.Stack())))
|
||||
result = &reply.UnknownErrReply{}
|
||||
}
|
||||
}()
|
||||
cmd := strings.ToLower(string(args[0]))
|
||||
cmd := strings.ToLower(string(cmdArgs[0]))
|
||||
if cmd == "auth" {
|
||||
return godis.Auth(cluster.db, c, args[1:])
|
||||
return godis.Auth(cluster.db, c, cmdArgs[1:])
|
||||
}
|
||||
if !isAuthenticated(c) {
|
||||
return reply.MakeErrReply("NOAUTH Authentication required")
|
||||
@@ -104,14 +109,16 @@ func (cluster *Cluster) Exec(c redis.Connection, args [][]byte) (result redis.Re
|
||||
if !ok {
|
||||
return reply.MakeErrReply("ERR unknown command '" + cmd + "', or not supported in cluster mode")
|
||||
}
|
||||
result = cmdFunc(cluster, c, args)
|
||||
result = cmdFunc(cluster, c, cmdArgs)
|
||||
return
|
||||
}
|
||||
|
||||
// AfterClientClose does some clean after client close connection
|
||||
func (cluster *Cluster) AfterClientClose(c redis.Connection) {
|
||||
cluster.db.AfterClientClose(c)
|
||||
}
|
||||
|
||||
func Ping(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
func ping(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
return godis.Ping(cluster.db, args[1:])
|
||||
}
|
||||
|
||||
|
@@ -1,4 +1,3 @@
|
||||
// communicate with peers within cluster
|
||||
package cluster
|
||||
|
||||
import (
|
||||
@@ -33,13 +32,13 @@ func (cluster *Cluster) returnPeerClient(peer string, peerClient *client.Client)
|
||||
return connectionFactory.ReturnObject(context.Background(), peerClient)
|
||||
}
|
||||
|
||||
// relay command to peer
|
||||
// relay relays command to peer
|
||||
// cannot call Prepare, Commit, Rollback of self node
|
||||
func (cluster *Cluster) Relay(peer string, c redis.Connection, args [][]byte) redis.Reply {
|
||||
func (cluster *Cluster) relay(peer string, c redis.Connection, args [][]byte) redis.Reply {
|
||||
if peer == cluster.self {
|
||||
// to self db
|
||||
return cluster.db.Exec(c, args)
|
||||
} else {
|
||||
}
|
||||
peerClient, err := cluster.getPeerClient(peer)
|
||||
if err != nil {
|
||||
return reply.MakeErrReply(err.Error())
|
||||
@@ -48,14 +47,13 @@ func (cluster *Cluster) Relay(peer string, c redis.Connection, args [][]byte) re
|
||||
_ = cluster.returnPeerClient(peer, peerClient)
|
||||
}()
|
||||
return peerClient.Send(args)
|
||||
}
|
||||
}
|
||||
|
||||
// broadcast command to all node in cluster
|
||||
func (cluster *Cluster) Broadcast(c redis.Connection, args [][]byte) map[string]redis.Reply {
|
||||
// broadcast broadcasts command to all node in cluster
|
||||
func (cluster *Cluster) broadcast(c redis.Connection, args [][]byte) map[string]redis.Reply {
|
||||
result := make(map[string]redis.Reply)
|
||||
for _, node := range cluster.nodes {
|
||||
reply := cluster.Relay(node, c, args)
|
||||
reply := cluster.relay(node, c, args)
|
||||
result[node] = reply
|
||||
}
|
||||
return result
|
||||
|
@@ -38,9 +38,9 @@ func TestRelay(t *testing.T) {
|
||||
testCluster2 := MakeTestCluster([]string{"127.0.0.1:6379"})
|
||||
key := RandString(4)
|
||||
value := RandString(4)
|
||||
ret := testCluster2.Relay("127.0.0.1:6379", nil, toArgs("SET", key, value))
|
||||
ret := testCluster2.relay("127.0.0.1:6379", nil, toArgs("SET", key, value))
|
||||
asserts.AssertNotError(t, ret)
|
||||
ret = testCluster2.Relay("127.0.0.1:6379", nil, toArgs("GET", key))
|
||||
ret = testCluster2.relay("127.0.0.1:6379", nil, toArgs("GET", key))
|
||||
asserts.AssertBulkReply(t, ret, value)
|
||||
}
|
||||
|
||||
@@ -48,7 +48,7 @@ func TestBroadcast(t *testing.T) {
|
||||
testCluster2 := MakeTestCluster([]string{"127.0.0.1:6379"})
|
||||
key := RandString(4)
|
||||
value := RandString(4)
|
||||
rets := testCluster2.Broadcast(nil, toArgs("SET", key, value))
|
||||
rets := testCluster2.broadcast(nil, toArgs("SET", key, value))
|
||||
for _, v := range rets {
|
||||
asserts.AssertNotError(t, v)
|
||||
}
|
||||
|
@@ -6,6 +6,8 @@ import (
|
||||
"strconv"
|
||||
)
|
||||
|
||||
// Del atomically removes given keys from cluster, keys can be distributed on any node
|
||||
// if the given keys are distributed on different node, Del will use try-commit-catch to remove them
|
||||
func Del(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
if len(args) < 2 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'del' command")
|
||||
@@ -17,22 +19,22 @@ func Del(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
groupMap := cluster.groupBy(keys)
|
||||
if len(groupMap) == 1 && allowFastTransaction { // do fast
|
||||
for peer, group := range groupMap { // only one group
|
||||
return cluster.Relay(peer, c, makeArgs("DEL", group...))
|
||||
return cluster.relay(peer, c, makeArgs("DEL", group...))
|
||||
}
|
||||
}
|
||||
// prepare
|
||||
var errReply redis.Reply
|
||||
txId := cluster.idGenerator.NextId()
|
||||
txIdStr := strconv.FormatInt(txId, 10)
|
||||
txID := cluster.idGenerator.NextId()
|
||||
txIDStr := strconv.FormatInt(txID, 10)
|
||||
rollback := false
|
||||
for peer, group := range groupMap {
|
||||
args := []string{txIdStr}
|
||||
args := []string{txIDStr}
|
||||
args = append(args, group...)
|
||||
var resp redis.Reply
|
||||
if peer == cluster.self {
|
||||
resp = PrepareDel(cluster, c, makeArgs("PrepareDel", args...))
|
||||
resp = prepareDel(cluster, c, makeArgs("PrepareDel", args...))
|
||||
} else {
|
||||
resp = cluster.Relay(peer, c, makeArgs("PrepareDel", args...))
|
||||
resp = cluster.relay(peer, c, makeArgs("PrepareDel", args...))
|
||||
}
|
||||
if reply.IsErrorReply(resp) {
|
||||
errReply = resp
|
||||
@@ -43,10 +45,10 @@ func Del(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
var respList []redis.Reply
|
||||
if rollback {
|
||||
// rollback
|
||||
RequestRollback(cluster, c, txId, groupMap)
|
||||
requestRollback(cluster, c, txID, groupMap)
|
||||
} else {
|
||||
// commit
|
||||
respList, errReply = RequestCommit(cluster, c, txId, groupMap)
|
||||
respList, errReply = requestCommit(cluster, c, txID, groupMap)
|
||||
if errReply != nil {
|
||||
rollback = true
|
||||
}
|
||||
@@ -63,19 +65,19 @@ func Del(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
}
|
||||
|
||||
// args: PrepareDel id keys...
|
||||
func PrepareDel(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
func prepareDel(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
if len(args) < 3 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'preparedel' command")
|
||||
}
|
||||
txId := string(args[1])
|
||||
txID := string(args[1])
|
||||
keys := make([]string, 0, len(args)-2)
|
||||
for i := 2; i < len(args); i++ {
|
||||
arg := args[i]
|
||||
keys = append(keys, string(arg))
|
||||
}
|
||||
txArgs := makeArgs("DEL", keys...) // actual args for cluster.db
|
||||
tx := NewTransaction(cluster, c, txId, txArgs, keys)
|
||||
cluster.transactions.Put(txId, tx)
|
||||
tx := NewTransaction(cluster, c, txID, txArgs, keys)
|
||||
cluster.transactions.Put(txID, tx)
|
||||
err := tx.prepare()
|
||||
if err != nil {
|
||||
return reply.MakeErrReply(err.Error())
|
||||
@@ -84,7 +86,7 @@ func PrepareDel(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply
|
||||
}
|
||||
|
||||
// invoker should provide lock
|
||||
func CommitDel(cluster *Cluster, c redis.Connection, tx *Transaction) redis.Reply {
|
||||
func commitDel(cluster *Cluster, c redis.Connection, tx *Transaction) redis.Reply {
|
||||
keys := make([]string, len(tx.args))
|
||||
for i, v := range tx.args {
|
||||
keys[i] = string(v)
|
||||
|
@@ -5,8 +5,9 @@ import (
|
||||
"github.com/hdt3213/godis/redis/reply"
|
||||
)
|
||||
|
||||
// FlushDB removes all data in current database
|
||||
func FlushDB(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
replies := cluster.Broadcast(c, args)
|
||||
replies := cluster.broadcast(c, args)
|
||||
var errReply reply.ErrorReply
|
||||
for _, v := range replies {
|
||||
if reply.IsErrorReply(v) {
|
||||
@@ -20,6 +21,7 @@ func FlushDB(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
return reply.MakeErrReply("error occurs: " + errReply.Error())
|
||||
}
|
||||
|
||||
// FlushAll removes all data in cluster
|
||||
func FlushAll(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
return FlushDB(cluster, c, args)
|
||||
}
|
||||
|
@@ -8,6 +8,7 @@ import (
|
||||
"strconv"
|
||||
)
|
||||
|
||||
// MGet atomically get multi key-value from cluster, keys can be distributed on any node
|
||||
func MGet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
if len(args) < 2 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'mget' command")
|
||||
@@ -20,7 +21,7 @@ func MGet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
resultMap := make(map[string][]byte)
|
||||
groupMap := cluster.groupBy(keys)
|
||||
for peer, group := range groupMap {
|
||||
resp := cluster.Relay(peer, c, makeArgs("MGET", group...))
|
||||
resp := cluster.relay(peer, c, makeArgs("MGET", group...))
|
||||
if reply.IsErrorReply(resp) {
|
||||
errReply := resp.(reply.ErrorReply)
|
||||
return reply.MakeErrReply(fmt.Sprintf("ERR during get %s occurs: %v", group[0], errReply.Error()))
|
||||
@@ -39,11 +40,11 @@ func MGet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
}
|
||||
|
||||
// args: PrepareMSet id keys...
|
||||
func PrepareMSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
func prepareMSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
if len(args) < 3 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'preparemset' command")
|
||||
}
|
||||
txId := string(args[1])
|
||||
txID := string(args[1])
|
||||
size := (len(args) - 2) / 2
|
||||
keys := make([]string, size)
|
||||
for i := 0; i < size; i++ {
|
||||
@@ -54,8 +55,8 @@ func PrepareMSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Repl
|
||||
[]byte("MSet"),
|
||||
} // actual args for cluster.db
|
||||
txArgs = append(txArgs, args[2:]...)
|
||||
tx := NewTransaction(cluster, c, txId, txArgs, keys)
|
||||
cluster.transactions.Put(txId, tx)
|
||||
tx := NewTransaction(cluster, c, txID, txArgs, keys)
|
||||
cluster.transactions.Put(txID, tx)
|
||||
err := tx.prepare()
|
||||
if err != nil {
|
||||
return reply.MakeErrReply(err.Error())
|
||||
@@ -64,7 +65,7 @@ func PrepareMSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Repl
|
||||
}
|
||||
|
||||
// invoker should provide lock
|
||||
func CommitMSet(cluster *Cluster, c redis.Connection, tx *Transaction) redis.Reply {
|
||||
func commitMSet(cluster *Cluster, c redis.Connection, tx *Transaction) redis.Reply {
|
||||
size := len(tx.args) / 2
|
||||
keys := make([]string, size)
|
||||
values := make([][]byte, size)
|
||||
@@ -80,6 +81,7 @@ func CommitMSet(cluster *Cluster, c redis.Connection, tx *Transaction) redis.Rep
|
||||
return &reply.OkReply{}
|
||||
}
|
||||
|
||||
// MSet atomically sets multi key-value in cluster, keys can be distributed on any node
|
||||
func MSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
argCount := len(args) - 1
|
||||
if argCount%2 != 0 || argCount < 1 {
|
||||
@@ -97,25 +99,25 @@ func MSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
groupMap := cluster.groupBy(keys)
|
||||
if len(groupMap) == 1 && allowFastTransaction { // do fast
|
||||
for peer := range groupMap {
|
||||
return cluster.Relay(peer, c, args)
|
||||
return cluster.relay(peer, c, args)
|
||||
}
|
||||
}
|
||||
|
||||
//prepare
|
||||
var errReply redis.Reply
|
||||
txId := cluster.idGenerator.NextId()
|
||||
txIdStr := strconv.FormatInt(txId, 10)
|
||||
txID := cluster.idGenerator.NextId()
|
||||
txIDStr := strconv.FormatInt(txID, 10)
|
||||
rollback := false
|
||||
for peer, group := range groupMap {
|
||||
peerArgs := []string{txIdStr}
|
||||
peerArgs := []string{txIDStr}
|
||||
for _, k := range group {
|
||||
peerArgs = append(peerArgs, k, valueMap[k])
|
||||
}
|
||||
var resp redis.Reply
|
||||
if peer == cluster.self {
|
||||
resp = PrepareMSet(cluster, c, makeArgs("PrepareMSet", peerArgs...))
|
||||
resp = prepareMSet(cluster, c, makeArgs("PrepareMSet", peerArgs...))
|
||||
} else {
|
||||
resp = cluster.Relay(peer, c, makeArgs("PrepareMSet", peerArgs...))
|
||||
resp = cluster.relay(peer, c, makeArgs("PrepareMSet", peerArgs...))
|
||||
}
|
||||
if reply.IsErrorReply(resp) {
|
||||
errReply = resp
|
||||
@@ -125,9 +127,9 @@ func MSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
}
|
||||
if rollback {
|
||||
// rollback
|
||||
RequestRollback(cluster, c, txId, groupMap)
|
||||
requestRollback(cluster, c, txID, groupMap)
|
||||
} else {
|
||||
_, errReply = RequestCommit(cluster, c, txId, groupMap)
|
||||
_, errReply = requestCommit(cluster, c, txID, groupMap)
|
||||
rollback = errReply != nil
|
||||
}
|
||||
if !rollback {
|
||||
@@ -137,6 +139,7 @@ func MSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
|
||||
}
|
||||
|
||||
// MSetNX sets multi key-value in database, only if none of the given keys exist and all given keys are on the same node
|
||||
func MSetNX(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
argCount := len(args) - 1
|
||||
if argCount%2 != 0 || argCount < 1 {
|
||||
@@ -155,5 +158,5 @@ func MSetNX(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
}
|
||||
}
|
||||
}
|
||||
return cluster.Relay(peer, c, args)
|
||||
return cluster.relay(peer, c, args)
|
||||
}
|
||||
|
@@ -16,10 +16,10 @@ var (
|
||||
publishCmd = []byte(publish)
|
||||
)
|
||||
|
||||
// broadcast msg to all peers in cluster when receive publish command from client
|
||||
// Publish broadcasts msg to all peers in cluster when receive publish command from client
|
||||
func Publish(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
var count int64 = 0
|
||||
results := cluster.Broadcast(c, args)
|
||||
results := cluster.broadcast(c, args)
|
||||
for _, val := range results {
|
||||
if errReply, ok := val.(reply.ErrorReply); ok {
|
||||
logger.Error("publish occurs error: " + errReply.Error())
|
||||
@@ -30,16 +30,18 @@ func Publish(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
return reply.MakeIntReply(count)
|
||||
}
|
||||
|
||||
// receive publish command from peer, just publish to local subscribing clients, do not relay to peers
|
||||
func OnRelayedPublish(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
// onRelayedPublish receives publish command from peer, just publish to local subscribing clients, do not relay to peers
|
||||
func onRelayedPublish(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
args[0] = publishCmd
|
||||
return cluster.db.Exec(c, args) // let local db.hub handle publish
|
||||
}
|
||||
|
||||
// Subscribe puts the given connection into the given channel
|
||||
func Subscribe(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
return cluster.db.Exec(c, args) // let local db.hub handle subscribe
|
||||
}
|
||||
|
||||
// UnSubscribe removes the given connection from the given channel
|
||||
func UnSubscribe(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
return cluster.db.Exec(c, args) // let local db.hub handle subscribe
|
||||
}
|
||||
|
@@ -5,7 +5,7 @@ import (
|
||||
"github.com/hdt3213/godis/redis/reply"
|
||||
)
|
||||
|
||||
// Rename renames a key, the origin and the destination must within the same slot
|
||||
// Rename renames a key, the origin and the destination must within the same node
|
||||
func Rename(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
if len(args) != 3 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'rename' command")
|
||||
@@ -19,9 +19,11 @@ func Rename(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
if srcPeer != destPeer {
|
||||
return reply.MakeErrReply("ERR rename must within one slot in cluster mode")
|
||||
}
|
||||
return cluster.Relay(srcPeer, c, args)
|
||||
return cluster.relay(srcPeer, c, args)
|
||||
}
|
||||
|
||||
// RenameNx renames a key, only if the new key does not exist.
|
||||
// The origin and the destination must within the same node
|
||||
func RenameNx(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
if len(args) != 3 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'renamenx' command")
|
||||
@@ -35,5 +37,5 @@ func RenameNx(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
if srcPeer != destPeer {
|
||||
return reply.MakeErrReply("ERR rename must within one slot in cluster mode")
|
||||
}
|
||||
return cluster.Relay(srcPeer, c, args)
|
||||
return cluster.relay(srcPeer, c, args)
|
||||
}
|
||||
|
@@ -2,15 +2,15 @@ package cluster
|
||||
|
||||
import "github.com/hdt3213/godis/interface/redis"
|
||||
|
||||
func MakeRouter() map[string]CmdFunc {
|
||||
func makeRouter() map[string]CmdFunc {
|
||||
routerMap := make(map[string]CmdFunc)
|
||||
routerMap["ping"] = Ping
|
||||
routerMap["ping"] = ping
|
||||
|
||||
routerMap["commit"] = Commit
|
||||
routerMap["commit"] = commit
|
||||
routerMap["rollback"] = Rollback
|
||||
routerMap["del"] = Del
|
||||
routerMap["preparedel"] = PrepareDel
|
||||
routerMap["preparemset"] = PrepareMSet
|
||||
routerMap["preparedel"] = prepareDel
|
||||
routerMap["preparemset"] = prepareMSet
|
||||
|
||||
routerMap["expire"] = defaultFunc
|
||||
routerMap["expireat"] = defaultFunc
|
||||
@@ -102,7 +102,7 @@ func MakeRouter() map[string]CmdFunc {
|
||||
routerMap["georadiusbymember"] = defaultFunc
|
||||
|
||||
routerMap["publish"] = Publish
|
||||
routerMap[relayPublish] = OnRelayedPublish
|
||||
routerMap[relayPublish] = onRelayedPublish
|
||||
routerMap["subscribe"] = Subscribe
|
||||
routerMap["unsubscribe"] = UnSubscribe
|
||||
|
||||
@@ -117,5 +117,5 @@ func MakeRouter() map[string]CmdFunc {
|
||||
func defaultFunc(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
key := string(args[1])
|
||||
peer := cluster.peerPicker.PickNode(key)
|
||||
return cluster.Relay(peer, c, args)
|
||||
return cluster.relay(peer, c, args)
|
||||
}
|
||||
|
@@ -1,7 +1,6 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/hdt3213/godis"
|
||||
"github.com/hdt3213/godis/interface/redis"
|
||||
@@ -14,6 +13,7 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// Transaction stores state and data for a try-commit-catch distributed transaction
|
||||
type Transaction struct {
|
||||
id string // transaction id
|
||||
args [][]byte // cmd args
|
||||
@@ -32,16 +32,17 @@ const (
|
||||
maxLockTime = 3 * time.Second
|
||||
waitBeforeCleanTx = 2 * maxLockTime
|
||||
|
||||
CreatedStatus = 0
|
||||
PreparedStatus = 1
|
||||
CommittedStatus = 2
|
||||
RolledBackStatus = 3
|
||||
createdStatus = 0
|
||||
preparedStatus = 1
|
||||
committedStatus = 2
|
||||
rolledBackStatus = 3
|
||||
)
|
||||
|
||||
func genTaskKey(txId string) string {
|
||||
return "tx:" + txId
|
||||
func genTaskKey(txID string) string {
|
||||
return "tx:" + txID
|
||||
}
|
||||
|
||||
// NewTransaction creates a try-commit-catch distributed transaction
|
||||
func NewTransaction(cluster *Cluster, c redis.Connection, id string, args [][]byte, keys []string) *Transaction {
|
||||
return &Transaction{
|
||||
id: id,
|
||||
@@ -49,7 +50,7 @@ func NewTransaction(cluster *Cluster, c redis.Connection, id string, args [][]by
|
||||
cluster: cluster,
|
||||
conn: c,
|
||||
keys: keys,
|
||||
status: CreatedStatus,
|
||||
status: createdStatus,
|
||||
mu: new(sync.Mutex),
|
||||
}
|
||||
}
|
||||
@@ -88,10 +89,10 @@ func (tx *Transaction) prepare() error {
|
||||
tx.undoLog[key] = nil // entity was nil, should be removed while rollback
|
||||
}
|
||||
}
|
||||
tx.status = PreparedStatus
|
||||
tx.status = preparedStatus
|
||||
taskKey := genTaskKey(tx.id)
|
||||
timewheel.Delay(maxLockTime, taskKey, func() {
|
||||
if tx.status == PreparedStatus { // rollback transaction uncommitted until expire
|
||||
if tx.status == preparedStatus { // rollback transaction uncommitted until expire
|
||||
logger.Info("abort transaction: " + tx.id)
|
||||
_ = tx.rollback()
|
||||
}
|
||||
@@ -105,9 +106,9 @@ func (tx *Transaction) rollback() error {
|
||||
defer tx.mu.Unlock()
|
||||
|
||||
if tx.status != curStatus { // ensure status not changed by other goroutine
|
||||
return errors.New(fmt.Sprintf("tx %s status changed", tx.id))
|
||||
return fmt.Errorf("tx %s status changed", tx.id)
|
||||
}
|
||||
if tx.status == RolledBackStatus { // no need to rollback a rolled-back transaction
|
||||
if tx.status == rolledBackStatus { // no need to rollback a rolled-back transaction
|
||||
return nil
|
||||
}
|
||||
tx.lockKeys()
|
||||
@@ -120,17 +121,17 @@ func (tx *Transaction) rollback() error {
|
||||
}
|
||||
}
|
||||
tx.unLockKeys()
|
||||
tx.status = RolledBackStatus
|
||||
tx.status = rolledBackStatus
|
||||
return nil
|
||||
}
|
||||
|
||||
// rollback local transaction
|
||||
// Rollback rollbacks local transaction
|
||||
func Rollback(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
if len(args) != 2 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'rollback' command")
|
||||
}
|
||||
txId := string(args[1])
|
||||
raw, ok := cluster.transactions.Get(txId)
|
||||
txID := string(args[1])
|
||||
raw, ok := cluster.transactions.Get(txID)
|
||||
if !ok {
|
||||
return reply.MakeIntReply(0)
|
||||
}
|
||||
@@ -146,13 +147,13 @@ func Rollback(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
return reply.MakeIntReply(1)
|
||||
}
|
||||
|
||||
// commit local transaction as a worker
|
||||
func Commit(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
// commit commits local transaction as a worker when receive commit command from coordinator
|
||||
func commit(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
if len(args) != 2 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'commit' command")
|
||||
}
|
||||
txId := string(args[1])
|
||||
raw, ok := cluster.transactions.Get(txId)
|
||||
txID := string(args[1])
|
||||
raw, ok := cluster.transactions.Get(txID)
|
||||
if !ok {
|
||||
return reply.MakeIntReply(0)
|
||||
}
|
||||
@@ -164,40 +165,38 @@ func Commit(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
cmd := strings.ToLower(string(tx.args[0]))
|
||||
var result redis.Reply
|
||||
if cmd == "del" {
|
||||
result = CommitDel(cluster, c, tx)
|
||||
result = commitDel(cluster, c, tx)
|
||||
} else if cmd == "mset" {
|
||||
result = CommitMSet(cluster, c, tx)
|
||||
result = commitMSet(cluster, c, tx)
|
||||
}
|
||||
|
||||
if reply.IsErrorReply(result) {
|
||||
// failed
|
||||
err2 := tx.rollback()
|
||||
return reply.MakeErrReply(fmt.Sprintf("err occurs when rollback: %v, origin err: %s", err2, result))
|
||||
} else {
|
||||
}
|
||||
// after committed
|
||||
tx.unLockKeys()
|
||||
tx.status = CommittedStatus
|
||||
// clean transaction
|
||||
tx.status = committedStatus
|
||||
// clean finished transaction
|
||||
// do not clean immediately, in case rollback
|
||||
timewheel.Delay(waitBeforeCleanTx, "", func() {
|
||||
cluster.transactions.Remove(tx.id)
|
||||
})
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// request all node commit transaction as leader
|
||||
func RequestCommit(cluster *Cluster, c redis.Connection, txId int64, peers map[string][]string) ([]redis.Reply, reply.ErrorReply) {
|
||||
// requestCommit commands all node commit transaction as coordinator
|
||||
func requestCommit(cluster *Cluster, c redis.Connection, txID int64, peers map[string][]string) ([]redis.Reply, reply.ErrorReply) {
|
||||
var errReply reply.ErrorReply
|
||||
txIdStr := strconv.FormatInt(txId, 10)
|
||||
txIDStr := strconv.FormatInt(txID, 10)
|
||||
respList := make([]redis.Reply, 0, len(peers))
|
||||
for peer := range peers {
|
||||
var resp redis.Reply
|
||||
if peer == cluster.self {
|
||||
resp = Commit(cluster, c, makeArgs("commit", txIdStr))
|
||||
resp = commit(cluster, c, makeArgs("commit", txIDStr))
|
||||
} else {
|
||||
resp = cluster.Relay(peer, c, makeArgs("commit", txIdStr))
|
||||
resp = cluster.relay(peer, c, makeArgs("commit", txIDStr))
|
||||
}
|
||||
if reply.IsErrorReply(resp) {
|
||||
errReply = resp.(reply.ErrorReply)
|
||||
@@ -206,20 +205,20 @@ func RequestCommit(cluster *Cluster, c redis.Connection, txId int64, peers map[s
|
||||
respList = append(respList, resp)
|
||||
}
|
||||
if errReply != nil {
|
||||
RequestRollback(cluster, c, txId, peers)
|
||||
requestRollback(cluster, c, txID, peers)
|
||||
return nil, errReply
|
||||
}
|
||||
return respList, nil
|
||||
}
|
||||
|
||||
// request all node rollback transaction as leader
|
||||
func RequestRollback(cluster *Cluster, c redis.Connection, txId int64, peers map[string][]string) {
|
||||
txIdStr := strconv.FormatInt(txId, 10)
|
||||
// requestRollback requests all node rollback transaction as coordinator
|
||||
func requestRollback(cluster *Cluster, c redis.Connection, txID int64, peers map[string][]string) {
|
||||
txIDStr := strconv.FormatInt(txID, 10)
|
||||
for peer := range peers {
|
||||
if peer == cluster.self {
|
||||
Rollback(cluster, c, makeArgs("rollback", txIdStr))
|
||||
Rollback(cluster, c, makeArgs("rollback", txIDStr))
|
||||
} else {
|
||||
cluster.Relay(peer, c, makeArgs("rollback", txIdStr))
|
||||
cluster.relay(peer, c, makeArgs("rollback", txIDStr))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -10,36 +10,36 @@ import (
|
||||
func TestRollback(t *testing.T) {
|
||||
// rollback uncommitted transaction
|
||||
FlushAll(testCluster, nil, toArgs("FLUSHALL"))
|
||||
txId := rand.Int63()
|
||||
txIdStr := strconv.FormatInt(txId, 10)
|
||||
txID := rand.Int63()
|
||||
txIDStr := strconv.FormatInt(txID, 10)
|
||||
keys := []string{"a", "b"}
|
||||
groupMap := testCluster.groupBy(keys)
|
||||
args := []string{txIdStr}
|
||||
args := []string{txIDStr}
|
||||
args = append(args, keys...)
|
||||
testCluster.Exec(nil, toArgs("SET", "a", "a"))
|
||||
ret := PrepareDel(testCluster, nil, makeArgs("PrepareDel", args...))
|
||||
ret := prepareDel(testCluster, nil, makeArgs("PrepareDel", args...))
|
||||
asserts.AssertNotError(t, ret)
|
||||
RequestRollback(testCluster, nil, txId, groupMap)
|
||||
requestRollback(testCluster, nil, txID, groupMap)
|
||||
ret = testCluster.Exec(nil, toArgs("GET", "a"))
|
||||
asserts.AssertBulkReply(t, ret, "a")
|
||||
|
||||
// rollback committed transaction
|
||||
FlushAll(testCluster, nil, toArgs("FLUSHALL"))
|
||||
txId = rand.Int63()
|
||||
txIdStr = strconv.FormatInt(txId, 10)
|
||||
args = []string{txIdStr}
|
||||
txID = rand.Int63()
|
||||
txIDStr = strconv.FormatInt(txID, 10)
|
||||
args = []string{txIDStr}
|
||||
args = append(args, keys...)
|
||||
testCluster.Exec(nil, toArgs("SET", "a", "a"))
|
||||
ret = PrepareDel(testCluster, nil, makeArgs("PrepareDel", args...))
|
||||
ret = prepareDel(testCluster, nil, makeArgs("PrepareDel", args...))
|
||||
asserts.AssertNotError(t, ret)
|
||||
_, err := RequestCommit(testCluster, nil, txId, groupMap)
|
||||
_, err := requestCommit(testCluster, nil, txID, groupMap)
|
||||
if err != nil {
|
||||
t.Errorf("del failed %v", err)
|
||||
return
|
||||
}
|
||||
ret = testCluster.Exec(nil, toArgs("GET", "a"))
|
||||
asserts.AssertNullBulk(t, ret)
|
||||
RequestRollback(testCluster, nil, txId, groupMap)
|
||||
requestRollback(testCluster, nil, txID, groupMap)
|
||||
ret = testCluster.Exec(nil, toArgs("GET", "a"))
|
||||
asserts.AssertBulkReply(t, ret, "a")
|
||||
}
|
||||
|
1
go.mod
1
go.mod
@@ -6,4 +6,5 @@ require (
|
||||
github.com/jolestar/go-commons-pool/v2 v2.1.1
|
||||
github.com/shopspring/decimal v1.2.0
|
||||
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
|
||||
golang.org/x/tools v0.1.0 // indirect
|
||||
)
|
||||
|
14
go.sum
14
go.sum
@@ -13,20 +13,34 @@ github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
|
||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug=
|
||||
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
|
||||
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
|
||||
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7 h1:EBZoQjiKKPaLbPrbpssUfuHtwM6KV/vb4U85g/cigFY=
|
||||
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
|
||||
golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY=
|
||||
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
|
@@ -65,6 +65,7 @@ func unsubscribe0(hub *Hub, channel string, client redis.Connection) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Subscribe puts the given connection into the given channel
|
||||
func Subscribe(hub *Hub, c redis.Connection, args [][]byte) redis.Reply {
|
||||
channels := make([]string, len(args))
|
||||
for i, b := range args {
|
||||
|
Reference in New Issue
Block a user