other cluster codes

This commit is contained in:
finley
2025-02-03 18:37:07 +08:00
parent b699918223
commit 0bf068f51c
38 changed files with 175 additions and 4479 deletions

View File

@@ -2,234 +2,38 @@
package cluster
import (
"fmt"
"runtime/debug"
"strings"
"github.com/hdt3213/rdb/core"
"github.com/hdt3213/godis/config"
database2 "github.com/hdt3213/godis/database"
"github.com/hdt3213/godis/datastruct/dict"
"github.com/hdt3213/godis/datastruct/set"
"github.com/hdt3213/godis/interface/database"
"github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/lib/idgenerator"
"github.com/hdt3213/godis/lib/logger"
"github.com/hdt3213/godis/redis/parser"
"github.com/hdt3213/godis/redis/protocol"
"os"
"path"
"sync"
_ "github.com/hdt3213/godis/cluster/commands" // register nodes
"github.com/hdt3213/godis/cluster/core"
"github.com/hdt3213/godis/cluster/raft"
"github.com/hdt3213/godis/config"
"github.com/hdt3213/godis/lib/logger"
)
// Cluster represents a node of godis cluster
// it holds part of data and coordinates other nodes to finish transactions
type Cluster struct {
self string
addr string
db database.DBEngine
transactions *dict.SimpleDict // id -> Transaction
transactionMu sync.RWMutex
topology topology
slotMu sync.RWMutex
slots map[uint32]*hostSlot
idGenerator *idgenerator.IDGenerator
clientFactory clientFactory
}
type peerClient interface {
Send(args [][]byte) redis.Reply
}
type peerStream interface {
Stream() <-chan *parser.Payload
Close() error
}
type clientFactory interface {
GetPeerClient(peerAddr string) (peerClient, error)
ReturnPeerClient(peerAddr string, peerClient peerClient) error
NewStream(peerAddr string, cmdLine CmdLine) (peerStream, error)
Close() error
}
const (
slotStateHost = iota
slotStateImporting
slotStateMovingOut
)
// hostSlot stores status of host which hosted by current node
type hostSlot struct {
state uint32
mu sync.RWMutex
// OldNodeID is the node which is moving out this slot
// only valid during slot is importing
oldNodeID string
// OldNodeID is the node which is importing this slot
// only valid during slot is moving out
newNodeID string
/* importedKeys stores imported keys during migrating progress
* While this slot is migrating, if importedKeys does not have the given key, then current node will import key before execute commands
*
* In a migrating slot, the slot on the old node is immutable, we only delete a key in the new node.
* Therefore, we must distinguish between non-migrated key and deleted key.
* Even if a key has been deleted, it still exists in importedKeys, so we can distinguish between non-migrated and deleted.
*/
importedKeys *set.Set
// keys stores all keys in this slot
// Cluster.makeInsertCallback and Cluster.makeDeleteCallback will keep keys up to time
keys *set.Set
}
// if only one node involved in a transaction, just execute the command don't apply tcc procedure
var allowFastTransaction = true
type Cluster = core.Cluster
// MakeCluster creates and starts a node of cluster
func MakeCluster() *Cluster {
cluster := &Cluster{
self: config.Properties.Self,
addr: config.Properties.AnnounceAddress(),
db: database2.NewStandaloneServer(),
transactions: dict.MakeSimple(),
idGenerator: idgenerator.MakeGenerator(config.Properties.Self),
clientFactory: newDefaultClientFactory(),
}
topologyPersistFile := path.Join(config.Properties.Dir, config.Properties.ClusterConfigFile)
cluster.topology = newRaft(cluster, topologyPersistFile)
cluster.db.SetKeyInsertedCallback(cluster.makeInsertCallback())
cluster.db.SetKeyDeletedCallback(cluster.makeDeleteCallback())
cluster.slots = make(map[uint32]*hostSlot)
var err error
if topologyPersistFile != "" && fileExists(topologyPersistFile) {
err = cluster.LoadConfig()
} else if config.Properties.ClusterAsSeed {
err = cluster.startAsSeed(config.Properties.AnnounceAddress())
} else {
err = cluster.Join(config.Properties.ClusterSeed)
}
raftPath := path.Join(config.Properties.Dir, "raft")
err := os.MkdirAll(raftPath, os.ModePerm)
if err != nil {
panic(err)
}
cluster, err := core.NewCluster(&core.Config{
RaftConfig: raft.RaftConfig{
RedisAdvertiseAddr: config.Properties.AnnounceAddress(),
RaftListenAddr: config.Properties.RaftListenAddr,
RaftAdvertiseAddr: config.Properties.RaftAdvertiseAddr,
Dir: raftPath,
},
StartAsSeed: config.Properties.ClusterAsSeed,
JoinAddress: config.Properties.ClusterSeed,
})
if err != nil {
logger.Error(err.Error())
panic(err)
}
return cluster
}
// CmdFunc represents the handler of a redis command
type CmdFunc func(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply
// Close stops current node of cluster
func (cluster *Cluster) Close() {
_ = cluster.topology.Close()
cluster.db.Close()
cluster.clientFactory.Close()
}
func isAuthenticated(c redis.Connection) bool {
if config.Properties.RequirePass == "" {
return true
}
return c.GetPassword() == config.Properties.RequirePass
}
// Exec executes command on cluster
func (cluster *Cluster) Exec(c redis.Connection, cmdLine [][]byte) (result redis.Reply) {
defer func() {
if err := recover(); err != nil {
logger.Warn(fmt.Sprintf("error occurs: %v\n%s", err, string(debug.Stack())))
result = &protocol.UnknownErrReply{}
}
}()
cmdName := strings.ToLower(string(cmdLine[0]))
if cmdName == "info" {
if ser, ok := cluster.db.(*database2.Server); ok {
return database2.Info(ser, cmdLine[1:])
}
}
if cmdName == "auth" {
return database2.Auth(c, cmdLine[1:])
}
if !isAuthenticated(c) {
return protocol.MakeErrReply("NOAUTH Authentication required")
}
if cmdName == "dbsize" {
if ser, ok := cluster.db.(*database2.Server); ok {
return database2.DbSize(c, ser)
}
}
if cmdName == "multi" {
if len(cmdLine) != 1 {
return protocol.MakeArgNumErrReply(cmdName)
}
return database2.StartMulti(c)
} else if cmdName == "discard" {
if len(cmdLine) != 1 {
return protocol.MakeArgNumErrReply(cmdName)
}
return database2.DiscardMulti(c)
} else if cmdName == "exec" {
if len(cmdLine) != 1 {
return protocol.MakeArgNumErrReply(cmdName)
}
return execMulti(cluster, c, nil)
} else if cmdName == "select" {
return protocol.MakeErrReply("select not supported in cluster")
}
if c != nil && c.InMultiState() {
return database2.EnqueueCmd(c, cmdLine)
}
cmdFunc, ok := router[cmdName]
if !ok {
return protocol.MakeErrReply("ERR unknown command '" + cmdName + "', or not supported in cluster mode")
}
result = cmdFunc(cluster, c, cmdLine)
return
}
// AfterClientClose does some clean after client close connection
func (cluster *Cluster) AfterClientClose(c redis.Connection) {
cluster.db.AfterClientClose(c)
}
func (cluster *Cluster) LoadRDB(dec *core.Decoder) error {
return cluster.db.LoadRDB(dec)
}
func (cluster *Cluster) makeInsertCallback() database.KeyEventCallback {
return func(dbIndex int, key string, entity *database.DataEntity) {
slotId := getSlot(key)
cluster.slotMu.RLock()
slot, ok := cluster.slots[slotId]
cluster.slotMu.RUnlock()
// As long as the command is executed, we should update slot.keys regardless of slot.state
if ok {
slot.mu.Lock()
defer slot.mu.Unlock()
slot.keys.Add(key)
}
}
}
func (cluster *Cluster) makeDeleteCallback() database.KeyEventCallback {
return func(dbIndex int, key string, entity *database.DataEntity) {
slotId := getSlot(key)
cluster.slotMu.RLock()
slot, ok := cluster.slots[slotId]
cluster.slotMu.RUnlock()
// As long as the command is executed, we should update slot.keys regardless of slot.state
if ok {
slot.mu.Lock()
defer slot.mu.Unlock()
slot.keys.Remove(key)
}
}
}
func fileExists(filename string) bool {
info, err := os.Stat(filename)
return err == nil && !info.IsDir()
}

View File

@@ -1,86 +0,0 @@
package cluster
import (
"github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/connection"
"github.com/hdt3213/godis/redis/protocol"
)
// relay function relays command to peer or calls cluster.Exec
func (cluster *Cluster) relay(peerId string, c redis.Connection, cmdLine [][]byte) redis.Reply {
// use a variable to allow injecting stub for testing, see defaultRelayImpl
if peerId == cluster.self {
// to self db
return cluster.Exec(c, cmdLine)
}
// peerId is peer.Addr
cli, err := cluster.clientFactory.GetPeerClient(peerId)
if err != nil {
return protocol.MakeErrReply(err.Error())
}
defer func() {
_ = cluster.clientFactory.ReturnPeerClient(peerId, cli)
}()
return cli.Send(cmdLine)
}
// relayByKey function relays command to peer
// use routeKey to determine peer node
func (cluster *Cluster) relayByKey(routeKey string, c redis.Connection, args [][]byte) redis.Reply {
slotId := getSlot(routeKey)
peer := cluster.pickNode(slotId)
return cluster.relay(peer.ID, c, args)
}
// broadcast function broadcasts command to all node in cluster
func (cluster *Cluster) broadcast(c redis.Connection, args [][]byte) map[string]redis.Reply {
result := make(map[string]redis.Reply)
for _, node := range cluster.topology.GetNodes() {
reply := cluster.relay(node.ID, c, args)
result[node.Addr] = reply
}
return result
}
// ensureKey will migrate key to current node if the key is in a slot migrating to current node
// invoker should provide with locks of key
func (cluster *Cluster) ensureKey(key string) protocol.ErrorReply {
slotId := getSlot(key)
cluster.slotMu.RLock()
slot := cluster.slots[slotId]
cluster.slotMu.RUnlock()
if slot == nil {
return nil
}
if slot.state != slotStateImporting || slot.importedKeys.Has(key) {
return nil
}
resp := cluster.relay(slot.oldNodeID, connection.NewFakeConn(), utils.ToCmdLine("DumpKey_", key))
if protocol.IsErrorReply(resp) {
return resp.(protocol.ErrorReply)
}
if protocol.IsEmptyMultiBulkReply(resp) {
slot.importedKeys.Add(key)
return nil
}
dumpResp := resp.(*protocol.MultiBulkReply)
if len(dumpResp.Args) != 2 {
return protocol.MakeErrReply("illegal dump key response")
}
// reuse copy to command ^_^
resp = cluster.db.Exec(connection.NewFakeConn(), [][]byte{
[]byte("CopyTo"), []byte(key), dumpResp.Args[0], dumpResp.Args[1],
})
if protocol.IsErrorReply(resp) {
return resp.(protocol.ErrorReply)
}
slot.importedKeys.Add(key)
return nil
}
func (cluster *Cluster) ensureKeyWithoutLock(key string) protocol.ErrorReply {
cluster.db.RWLocks(0, []string{key}, nil)
defer cluster.db.RWUnLocks(0, []string{key}, nil)
return cluster.ensureKey(key)
}

View File

@@ -1,142 +0,0 @@
package cluster
import (
"errors"
"fmt"
"github.com/hdt3213/godis/config"
"github.com/hdt3213/godis/datastruct/dict"
"github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/lib/logger"
"github.com/hdt3213/godis/lib/pool"
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/client"
"github.com/hdt3213/godis/redis/parser"
"github.com/hdt3213/godis/redis/protocol"
"net"
)
type defaultClientFactory struct {
nodeConnections dict.Dict // map[string]*pool.Pool
}
var connectionPoolConfig = pool.Config{
MaxIdle: 1,
MaxActive: 16,
}
// GetPeerClient gets a client with peer form pool
func (factory *defaultClientFactory) GetPeerClient(peerAddr string) (peerClient, error) {
var connectionPool *pool.Pool
raw, ok := factory.nodeConnections.Get(peerAddr)
if !ok {
creator := func() (interface{}, error) {
c, err := client.MakeClient(peerAddr)
if err != nil {
return nil, err
}
c.Start()
// all peers of cluster should use the same password
if config.Properties.RequirePass != "" {
authResp := c.Send(utils.ToCmdLine("AUTH", config.Properties.RequirePass))
if !protocol.IsOKReply(authResp) {
return nil, fmt.Errorf("auth failed, resp: %s", string(authResp.ToBytes()))
}
}
return c, nil
}
finalizer := func(x interface{}) {
logger.Debug("destroy client")
cli, ok := x.(client.Client)
if !ok {
return
}
cli.Close()
}
connectionPool = pool.New(creator, finalizer, connectionPoolConfig)
factory.nodeConnections.Put(peerAddr, connectionPool)
} else {
connectionPool = raw.(*pool.Pool)
}
raw, err := connectionPool.Get()
if err != nil {
return nil, err
}
conn, ok := raw.(*client.Client)
if !ok {
return nil, errors.New("connection pool make wrong type")
}
return conn, nil
}
// ReturnPeerClient returns client to pool
func (factory *defaultClientFactory) ReturnPeerClient(peer string, peerClient peerClient) error {
raw, ok := factory.nodeConnections.Get(peer)
if !ok {
return errors.New("connection pool not found")
}
raw.(*pool.Pool).Put(peerClient)
return nil
}
type tcpStream struct {
conn net.Conn
ch <-chan *parser.Payload
}
func (s *tcpStream) Stream() <-chan *parser.Payload {
return s.ch
}
func (s *tcpStream) Close() error {
return s.conn.Close()
}
func (factory *defaultClientFactory) NewStream(peerAddr string, cmdLine CmdLine) (peerStream, error) {
// todo: reuse connection
conn, err := net.Dial("tcp", peerAddr)
if err != nil {
return nil, fmt.Errorf("connect with %s failed: %v", peerAddr, err)
}
ch := parser.ParseStream(conn)
send2node := func(cmdLine CmdLine) redis.Reply {
req := protocol.MakeMultiBulkReply(cmdLine)
_, err := conn.Write(req.ToBytes())
if err != nil {
return protocol.MakeErrReply(err.Error())
}
resp := <-ch
if resp.Err != nil {
return protocol.MakeErrReply(resp.Err.Error())
}
return resp.Data
}
if config.Properties.RequirePass != "" {
authResp := send2node(utils.ToCmdLine("AUTH", config.Properties.RequirePass))
if !protocol.IsOKReply(authResp) {
return nil, fmt.Errorf("auth failed, resp: %s", string(authResp.ToBytes()))
}
}
req := protocol.MakeMultiBulkReply(cmdLine)
_, err = conn.Write(req.ToBytes())
if err != nil {
return nil, protocol.MakeErrReply("send cmdLine failed: " + err.Error())
}
return &tcpStream{
conn: conn,
ch: ch,
}, nil
}
func newDefaultClientFactory() *defaultClientFactory {
return &defaultClientFactory{
nodeConnections: dict.MakeConcurrent(1),
}
}
func (factory *defaultClientFactory) Close() error {
factory.nodeConnections.ForEach(func(key string, val interface{}) bool {
val.(*pool.Pool).Close()
return true
})
return nil
}

View File

@@ -1,58 +0,0 @@
package cluster
import (
"github.com/hdt3213/godis/config"
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/connection"
"github.com/hdt3213/godis/redis/protocol/asserts"
"testing"
)
func TestExec(t *testing.T) {
testCluster := testCluster[0]
conn := connection.NewFakeConn()
for i := 0; i < 1000; i++ {
key := RandString(4)
value := RandString(4)
testCluster.Exec(conn, toArgs("SET", key, value))
ret := testCluster.Exec(conn, toArgs("GET", key))
asserts.AssertBulkReply(t, ret, value)
}
}
func TestAuth(t *testing.T) {
passwd := utils.RandString(10)
config.Properties.RequirePass = passwd
defer func() {
config.Properties.RequirePass = ""
}()
conn := connection.NewFakeConn()
testCluster := testCluster[0]
ret := testCluster.Exec(conn, toArgs("GET", "a"))
asserts.AssertErrReply(t, ret, "NOAUTH Authentication required")
ret = testCluster.Exec(conn, toArgs("AUTH", passwd))
asserts.AssertStatusReply(t, ret, "OK")
ret = testCluster.Exec(conn, toArgs("GET", "a"))
asserts.AssertNotError(t, ret)
}
func TestRelay(t *testing.T) {
testNodeA := testCluster[1]
key := RandString(4)
value := RandString(4)
conn := connection.NewFakeConn()
ret := testNodeA.relay(addresses[1], conn, toArgs("SET", key, value))
asserts.AssertNotError(t, ret)
ret = testNodeA.relay(addresses[1], conn, toArgs("GET", key))
asserts.AssertBulkReply(t, ret, value)
}
func TestBroadcast(t *testing.T) {
testCluster2 := testCluster[0]
key := RandString(4)
value := RandString(4)
rets := testCluster2.broadcast(connection.NewFakeConn(), toArgs("SET", key, value))
for _, v := range rets {
asserts.AssertNotError(t, v)
}
}

View File

@@ -0,0 +1,95 @@
package commands
import "github.com/hdt3213/godis/cluster/core"
func RegisterCommands() {
defaultCmds := []string{
"expire",
"expireAt",
"pExpire",
"pExpireAt",
"ttl",
"PTtl",
"persist",
"exists",
"type",
"set",
"setNx",
"setEx",
"pSetEx",
"get",
"getEx",
"getSet",
"getDel",
"incr",
"incrBy",
"incrByFloat",
"decr",
"decrBy",
"lPush",
"lPushX",
"rPush",
"rPushX",
"LPop",
"RPop",
"LRem",
"LLen",
"LIndex",
"LSet",
"LRange",
"HSet",
"HSetNx",
"HGet",
"HExists",
"HDel",
"HLen",
"HStrLen",
"HMGet",
"HMSet",
"HKeys",
"HVals",
"HGetAll",
"HIncrBy",
"HIncrByFloat",
"HRandField",
"SAdd",
"SIsMember",
"SRem",
"SPop",
"SCard",
"SMembers",
"SInter",
"SInterStore",
"SUnion",
"SUnionStore",
"SDiff",
"SDiffStore",
"SRandMember",
"ZAdd",
"ZScore",
"ZIncrBy",
"ZRank",
"ZCount",
"ZRevRank",
"ZCard",
"ZRange",
"ZRevRange",
"ZRangeByScore",
"ZRevRangeByScore",
"ZRem",
"ZRemRangeByScore",
"ZRemRangeByRank",
"GeoAdd",
"GeoPos",
"GeoDist",
"GeoHash",
"GeoRadius",
"GeoRadiusByMember",
"GetVer",
"DumpKey",
}
for _, name := range defaultCmds {
core.RegisterDefaultCmd(name)
}
}

View File

@@ -1,119 +0,0 @@
package cluster
import (
"github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/protocol"
"strconv"
"strings"
)
const copyToAnotherDBErr = "ERR Copying to another database is not allowed in cluster mode"
const noReplace = "NoReplace"
const useReplace = "UseReplace"
// Copy copies the value stored at the source key to the destination key.
// the origin and the destination must within the same node.
func Copy(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
if len(args) < 3 {
return protocol.MakeErrReply("ERR wrong number of arguments for 'copy' command")
}
srcKey := string(args[1])
destKey := string(args[2])
srcNode := cluster.pickNodeAddrByKey(srcKey)
destNode := cluster.pickNodeAddrByKey(destKey)
replaceFlag := noReplace
if len(args) > 3 {
for i := 3; i < len(args); i++ {
arg := strings.ToLower(string(args[i]))
if arg == "db" {
return protocol.MakeErrReply(copyToAnotherDBErr)
} else if arg == "replace" {
replaceFlag = useReplace
} else {
return protocol.MakeSyntaxErrReply()
}
}
}
if srcNode == destNode {
args[0] = []byte("Copy_") // Copy_ will go directly to cluster.DB avoiding infinite recursion
return cluster.relay(srcNode, c, args)
}
groupMap := map[string][]string{
srcNode: {srcKey},
destNode: {destKey},
}
txID := cluster.idGenerator.NextID()
txIDStr := strconv.FormatInt(txID, 10)
// prepare Copy from
srcPrepareResp := cluster.relay(srcNode, c, makeArgs("Prepare", txIDStr, "CopyFrom", srcKey))
if protocol.IsErrorReply(srcPrepareResp) {
// rollback src node
requestRollback(cluster, c, txID, map[string][]string{srcNode: {srcKey}})
return srcPrepareResp
}
srcPrepareMBR, ok := srcPrepareResp.(*protocol.MultiBulkReply)
if !ok || len(srcPrepareMBR.Args) < 2 {
requestRollback(cluster, c, txID, map[string][]string{srcNode: {srcKey}})
return protocol.MakeErrReply("ERR invalid prepare response")
}
// prepare Copy to
destPrepareResp := cluster.relay(destNode, c, utils.ToCmdLine3("Prepare", []byte(txIDStr),
[]byte("CopyTo"), []byte(destKey), srcPrepareMBR.Args[0], srcPrepareMBR.Args[1], []byte(replaceFlag)))
if destErr, ok := destPrepareResp.(protocol.ErrorReply); ok {
// rollback src node
requestRollback(cluster, c, txID, groupMap)
if destErr.Error() == keyExistsErr {
return protocol.MakeIntReply(0)
}
return destPrepareResp
}
if _, errReply := requestCommit(cluster, c, txID, groupMap); errReply != nil {
requestRollback(cluster, c, txID, groupMap)
return errReply
}
return protocol.MakeIntReply(1)
}
// prepareCopyFrom is prepare-function for CopyFrom, see prepareFuncMap
func prepareCopyFrom(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply {
if len(cmdLine) != 2 {
return protocol.MakeArgNumErrReply("CopyFrom")
}
key := string(cmdLine[1])
existResp := cluster.db.ExecWithLock(conn, utils.ToCmdLine("Exists", key))
if protocol.IsErrorReply(existResp) {
return existResp
}
existIntResp := existResp.(*protocol.IntReply)
if existIntResp.Code == 0 {
return protocol.MakeErrReply("ERR no such key")
}
return cluster.db.ExecWithLock(conn, utils.ToCmdLine2("DumpKey", key))
}
func prepareCopyTo(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply {
if len(cmdLine) != 5 {
return protocol.MakeArgNumErrReply("CopyTo")
}
key := string(cmdLine[1])
replaceFlag := string(cmdLine[4])
existResp := cluster.db.ExecWithLock(conn, utils.ToCmdLine("Exists", key))
if protocol.IsErrorReply(existResp) {
return existResp
}
existIntResp := existResp.(*protocol.IntReply)
if existIntResp.Code == 1 {
if replaceFlag == noReplace {
return protocol.MakeErrReply(keyExistsErr)
}
}
return protocol.MakeOkReply()
}
func init() {
registerPrepareFunc("CopyFrom", prepareCopyFrom)
registerPrepareFunc("CopyTo", prepareCopyTo)
}

View File

@@ -1,128 +0,0 @@
package cluster
import (
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/connection"
"github.com/hdt3213/godis/redis/protocol/asserts"
"testing"
)
func TestCopy(t *testing.T) {
conn := new(connection.FakeConn)
testNodeA := testCluster[0]
testNodeB := testCluster[1]
testNodeA.Exec(conn, utils.ToCmdLine("FlushALL"))
testNodeB.Exec(conn, utils.ToCmdLine("FlushALL"))
// cross node copy
srcKey := "127.0.0.1:6399Bk2r3Sz0V5" // use fix key to ensure hashing to different node
destKey := "127.0.0.1:7379CcdC0QOopF"
value := utils.RandString(10)
testNodeA.Exec(conn, utils.ToCmdLine("SET", srcKey, value))
result := Copy(testNodeA, conn, utils.ToCmdLine("COPY", srcKey, destKey))
asserts.AssertIntReply(t, result, 1)
result = testNodeA.Exec(conn, utils.ToCmdLine("GET", srcKey))
asserts.AssertBulkReply(t, result, value)
result = testNodeB.Exec(conn, utils.ToCmdLine("GET", destKey))
asserts.AssertBulkReply(t, result, value)
// key exists
result = Copy(testNodeA, conn, utils.ToCmdLine("COPY", srcKey, destKey))
asserts.AssertIntReply(t, result, 0)
// replace
value = utils.RandString(10)
testNodeA.Exec(conn, utils.ToCmdLine("SET", srcKey, value))
result = Copy(testNodeA, conn, utils.ToCmdLine("COPY", srcKey, destKey, "REPLACE"))
asserts.AssertIntReply(t, result, 1)
result = testNodeA.Exec(conn, utils.ToCmdLine("GET", srcKey))
asserts.AssertBulkReply(t, result, value)
result = testNodeB.Exec(conn, utils.ToCmdLine("GET", destKey))
asserts.AssertBulkReply(t, result, value)
// test copy expire time
testNodeA.Exec(conn, utils.ToCmdLine("SET", srcKey, value, "EX", "1000"))
result = Copy(testNodeA, conn, utils.ToCmdLine("COPY", srcKey, destKey, "REPLACE"))
asserts.AssertIntReply(t, result, 1)
result = testNodeA.Exec(conn, utils.ToCmdLine("TTL", srcKey))
asserts.AssertIntReplyGreaterThan(t, result, 0)
result = testNodeB.Exec(conn, utils.ToCmdLine("TTL", destKey))
asserts.AssertIntReplyGreaterThan(t, result, 0)
// same node copy
srcKey = "{" + testNodeA.self + "}" + utils.RandString(10)
destKey = "{" + testNodeA.self + "}" + utils.RandString(9)
value = utils.RandString(10)
testNodeA.Exec(conn, utils.ToCmdLine("SET", srcKey, value))
result = Copy(testNodeA, conn, utils.ToCmdLine("COPY", srcKey, destKey))
asserts.AssertIntReply(t, result, 1)
result = testNodeA.Exec(conn, utils.ToCmdLine("GET", srcKey))
asserts.AssertBulkReply(t, result, value)
result = testNodeA.Exec(conn, utils.ToCmdLine("GET", destKey))
asserts.AssertBulkReply(t, result, value)
// key exists
result = Copy(testNodeA, conn, utils.ToCmdLine("COPY", srcKey, destKey))
asserts.AssertIntReply(t, result, 0)
// replace
value = utils.RandString(10)
testNodeA.Exec(conn, utils.ToCmdLine("SET", srcKey, value))
result = Copy(testNodeA, conn, utils.ToCmdLine("COPY", srcKey, destKey, "REPLACE"))
asserts.AssertIntReply(t, result, 1)
result = testNodeA.Exec(conn, utils.ToCmdLine("GET", srcKey))
asserts.AssertBulkReply(t, result, value)
result = testNodeA.Exec(conn, utils.ToCmdLine("GET", destKey))
asserts.AssertBulkReply(t, result, value)
// test copy expire time
testNodeA.Exec(conn, utils.ToCmdLine("SET", srcKey, value, "EX", "1000"))
result = Copy(testNodeA, conn, utils.ToCmdLine("COPY", srcKey, destKey, "REPLACE"))
asserts.AssertIntReply(t, result, 1)
result = testNodeA.Exec(conn, utils.ToCmdLine("TTL", srcKey))
asserts.AssertIntReplyGreaterThan(t, result, 0)
result = testNodeA.Exec(conn, utils.ToCmdLine("TTL", destKey))
asserts.AssertIntReplyGreaterThan(t, result, 0)
}
func TestCopyTimeout(t *testing.T) {
conn := new(connection.FakeConn)
testNodeA := testCluster[0]
testNodeB := testCluster[1]
testNodeA.Exec(conn, utils.ToCmdLine("FlushALL"))
testNodeB.Exec(conn, utils.ToCmdLine("FlushALL"))
// test src prepare failed
timeoutFlags[0] = true
srcKey := "127.0.0.1:6399Bk2r3Sz0V5" // use fix key to ensure hashing to different node
destKey := "127.0.0.1:7379CcdC0QOopF"
value := utils.RandString(10)
testNodeA.Exec(conn, utils.ToCmdLine("SET", srcKey, value, "ex", "1000"))
result := Rename(testNodeB, conn, utils.ToCmdLine("RENAME", srcKey, destKey))
asserts.AssertErrReply(t, result, "ERR timeout")
result = testNodeA.Exec(conn, utils.ToCmdLine("EXISTS", srcKey))
asserts.AssertIntReply(t, result, 1)
result = testNodeA.Exec(conn, utils.ToCmdLine("TTL", srcKey))
asserts.AssertIntReplyGreaterThan(t, result, 0)
result = testNodeB.Exec(conn, utils.ToCmdLine("EXISTS", destKey))
asserts.AssertIntReply(t, result, 0)
timeoutFlags[0] = false
// test dest prepare failed
timeoutFlags[1] = true
value = utils.RandString(10)
testNodeA.Exec(conn, utils.ToCmdLine("SET", srcKey, value, "ex", "1000"))
result = Rename(testNodeA, conn, utils.ToCmdLine("COPY", srcKey, destKey))
asserts.AssertErrReply(t, result, "ERR timeout")
result = testNodeA.Exec(conn, utils.ToCmdLine("EXISTS", srcKey))
asserts.AssertIntReply(t, result, 1)
result = testNodeA.Exec(conn, utils.ToCmdLine("TTL", srcKey))
asserts.AssertIntReplyGreaterThan(t, result, 0)
result = testNodeB.Exec(conn, utils.ToCmdLine("EXISTS", destKey))
asserts.AssertIntReply(t, result, 0)
timeoutFlags[1] = false
// Copying to another database
srcKey = testNodeA.self + utils.RandString(10)
value = utils.RandString(10)
destKey = srcKey + utils.RandString(2)
testNodeA.Exec(conn, utils.ToCmdLine("SET", srcKey, value))
result = Copy(testNodeA, conn, utils.ToCmdLine("COPY", srcKey, destKey, "db", "1"))
asserts.AssertErrReply(t, result, copyToAnotherDBErr)
result = Copy(testNodeA, conn, utils.ToCmdLine("COPY", srcKey))
asserts.AssertErrReply(t, result, "ERR wrong number of arguments for 'copy' command")
}

View File

@@ -7,6 +7,7 @@ import (
dbimpl "github.com/hdt3213/godis/database"
"github.com/hdt3213/godis/datastruct/set"
"github.com/hdt3213/godis/interface/database"
"github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/protocol"
)
@@ -130,3 +131,16 @@ func NewCluster(cfg *Config) (*Cluster, error) {
cluster.injectDeleteCallback()
return cluster, nil
}
// AfterClientClose does some clean after client close connection
func (cluster *Cluster) AfterClientClose(c redis.Connection) {
}
func (cluster *Cluster) Close() {
cluster.db.Close()
err := cluster.raftNode.Close()
if err != nil {
panic(err)
}
}

View File

@@ -6,6 +6,9 @@ import (
"time"
"github.com/hdt3213/godis/cluster/raft"
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/connection"
"github.com/hdt3213/godis/redis/protocol"
)
func TestClusterBootstrap(t *testing.T) {
@@ -16,6 +19,9 @@ func TestClusterBootstrap(t *testing.T) {
defer func() {
os.RemoveAll(leaderDir)
}()
RegisterDefaultCmd("get")
RegisterDefaultCmd("set")
// connection stub
connections := NewInMemConnectionFactory()
leaderCfg := &Config{
@@ -35,6 +41,20 @@ func TestClusterBootstrap(t *testing.T) {
}
connections.nodes[leaderCfg.RedisAdvertiseAddr] = leader
// set key-values for test
testEntries := make(map[string]string)
c := connection.NewFakeConn()
for i := 0; i < 1000; i++ {
key := utils.RandString(10)
value := utils.RandString(10)
testEntries[key] = value
result := leader.Exec(c, utils.ToCmdLine("set", key, value))
if !protocol.IsOKReply(result) {
t.Errorf("command [set] failed: %s", string(result.ToBytes()))
return
}
}
// start follower
followerDir := "test/1"
os.RemoveAll(followerDir)
@@ -100,4 +120,15 @@ func TestClusterBootstrap(t *testing.T) {
time.Sleep(time.Second)
}
}
// set key-values for test
for key, value := range testEntries {
c := connection.NewFakeConn()
result := leader.Exec(c, utils.ToCmdLine("get", key))
result2 := result.(*protocol.BulkReply)
if string(result2.Arg) != value {
t.Errorf("command [get] failed: %s", string(result.ToBytes()))
return
}
}
}

View File

@@ -1,61 +0,0 @@
package cluster
import (
"github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/redis/protocol"
"strconv"
)
// Del atomically removes given writeKeys from cluster, writeKeys can be distributed on any node
// if the given writeKeys are distributed on different node, Del will use try-commit-catch to remove them
func Del(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
if len(args) < 2 {
return protocol.MakeErrReply("ERR wrong number of arguments for 'del' command")
}
keys := make([]string, len(args)-1)
for i := 1; i < len(args); i++ {
keys[i-1] = string(args[i])
}
groupMap := cluster.groupBy(keys)
if len(groupMap) == 1 && allowFastTransaction { // do fast
for peer, group := range groupMap { // only one peerKeys
return cluster.relay(peer, c, makeArgs("Del_", group...))
}
}
// prepare
var errReply redis.Reply
txID := cluster.idGenerator.NextID()
txIDStr := strconv.FormatInt(txID, 10)
rollback := false
for peer, peerKeys := range groupMap {
peerArgs := []string{txIDStr, "DEL"}
peerArgs = append(peerArgs, peerKeys...)
var resp redis.Reply
resp = cluster.relay(peer, c, makeArgs("Prepare", peerArgs...))
if protocol.IsErrorReply(resp) {
errReply = resp
rollback = true
break
}
}
var respList []redis.Reply
if rollback {
// rollback
requestRollback(cluster, c, txID, groupMap)
} else {
// commit
respList, errReply = requestCommit(cluster, c, txID, groupMap)
if errReply != nil {
rollback = true
}
}
if !rollback {
var deleted int64 = 0
for _, resp := range respList {
intResp := resp.(*protocol.IntReply)
deleted += intResp.Code
}
return protocol.MakeIntReply(int64(deleted))
}
return errReply
}

View File

@@ -1,18 +0,0 @@
package cluster
import (
"github.com/hdt3213/godis/redis/connection"
"github.com/hdt3213/godis/redis/protocol/asserts"
"testing"
)
func TestDel(t *testing.T) {
conn := connection.NewFakeConn()
allowFastTransaction = false
testNodeA := testCluster[0]
testNodeA.Exec(conn, toArgs("SET", "a", "a"))
ret := Del(testNodeA, conn, toArgs("DEL", "a", "b", "c"))
asserts.AssertNotError(t, ret)
ret = testNodeA.Exec(conn, toArgs("GET", "a"))
asserts.AssertNullBulk(t, ret)
}

View File

@@ -1,58 +0,0 @@
package cluster
import (
"github.com/hdt3213/godis/redis/protocol"
"sync"
)
// fixedTopology is a fixed cluster topology, used for test
type fixedTopology struct {
mu sync.RWMutex
nodeMap map[string]*Node
slots []*Slot
selfNodeID string
}
func (fixed *fixedTopology) GetSelfNodeID() string {
return fixed.selfNodeID
}
func (fixed *fixedTopology) GetNodes() []*Node {
fixed.mu.RLock()
defer fixed.mu.RUnlock()
result := make([]*Node, 0, len(fixed.nodeMap))
for _, v := range fixed.nodeMap {
result = append(result, v)
}
return result
}
func (fixed *fixedTopology) GetNode(nodeID string) *Node {
fixed.mu.RLock()
defer fixed.mu.RUnlock()
return fixed.nodeMap[nodeID]
}
func (fixed *fixedTopology) GetSlots() []*Slot {
return fixed.slots
}
func (fixed *fixedTopology) StartAsSeed(addr string) protocol.ErrorReply {
return nil
}
func (fixed *fixedTopology) LoadConfigFile() protocol.ErrorReply {
return nil
}
func (fixed *fixedTopology) Join(seed string) protocol.ErrorReply {
return protocol.MakeErrReply("fixed topology does not support join")
}
func (fixed *fixedTopology) SetSlot(slotIDs []uint32, newNodeID string) protocol.ErrorReply {
return protocol.MakeErrReply("fixed topology does not support set slots")
}
func (fixed *fixedTopology) Close() error {
return nil
}

View File

@@ -1,27 +0,0 @@
package cluster
import (
"github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/redis/protocol"
)
// FlushDB removes all data in current database
func FlushDB(cluster *Cluster, c redis.Connection, cmdLine [][]byte) redis.Reply {
replies := cluster.broadcast(c, modifyCmd(cmdLine, "FlushDB_"))
var errReply protocol.ErrorReply
for _, v := range replies {
if protocol.IsErrorReply(v) {
errReply = v.(protocol.ErrorReply)
break
}
}
if errReply == nil {
return &protocol.OkReply{}
}
return protocol.MakeErrReply("error occurs: " + errReply.Error())
}
// FlushAll removes all data in cluster
func FlushAll(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
return FlushDB(cluster, c, args)
}

View File

@@ -1,181 +0,0 @@
package cluster
import (
"fmt"
"github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/protocol"
"strconv"
)
const keyExistsErr = "key exists"
// MGet atomically get multi key-value from cluster, writeKeys can be distributed on any node
func MGet(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
if len(cmdLine) < 2 {
return protocol.MakeErrReply("ERR wrong number of arguments for 'mget' command")
}
keys := make([]string, len(cmdLine)-1)
for i := 1; i < len(cmdLine); i++ {
keys[i-1] = string(cmdLine[i])
}
resultMap := make(map[string][]byte)
groupMap := cluster.groupBy(keys)
for peer, groupKeys := range groupMap {
resp := cluster.relay(peer, c, makeArgs("MGet_", groupKeys...))
if protocol.IsErrorReply(resp) {
errReply := resp.(protocol.ErrorReply)
return protocol.MakeErrReply(fmt.Sprintf("ERR during get %s occurs: %v", groupKeys[0], errReply.Error()))
}
arrReply, _ := resp.(*protocol.MultiBulkReply)
for i, v := range arrReply.Args {
key := groupKeys[i]
resultMap[key] = v
}
}
result := make([][]byte, len(keys))
for i, k := range keys {
result[i] = resultMap[k]
}
return protocol.MakeMultiBulkReply(result)
}
// MSet atomically sets multi key-value in cluster, writeKeys can be distributed on any node
func MSet(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
argCount := len(cmdLine) - 1
if argCount%2 != 0 || argCount < 1 {
return protocol.MakeErrReply("ERR wrong number of arguments for 'mset' command")
}
size := argCount / 2
keys := make([]string, size)
valueMap := make(map[string]string)
for i := 0; i < size; i++ {
keys[i] = string(cmdLine[2*i+1])
valueMap[keys[i]] = string(cmdLine[2*i+2])
}
groupMap := cluster.groupBy(keys)
if len(groupMap) == 1 && allowFastTransaction { // do fast
for peer := range groupMap {
return cluster.relay(peer, c, modifyCmd(cmdLine, "MSet_"))
}
}
//prepare
var errReply redis.Reply
txID := cluster.idGenerator.NextID()
txIDStr := strconv.FormatInt(txID, 10)
rollback := false
for peer, group := range groupMap {
peerArgs := []string{txIDStr, "MSET"}
for _, k := range group {
peerArgs = append(peerArgs, k, valueMap[k])
}
resp := cluster.relay(peer, c, makeArgs("Prepare", peerArgs...))
if protocol.IsErrorReply(resp) {
errReply = resp
rollback = true
break
}
}
if rollback {
// rollback
requestRollback(cluster, c, txID, groupMap)
} else {
_, errReply = requestCommit(cluster, c, txID, groupMap)
rollback = errReply != nil
}
if !rollback {
return &protocol.OkReply{}
}
return errReply
}
// MSetNX sets multi key-value in database, only if none of the given writeKeys exist and all given writeKeys are on the same node
func MSetNX(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
argCount := len(cmdLine) - 1
if argCount%2 != 0 || argCount < 1 {
return protocol.MakeErrReply("ERR wrong number of arguments for 'msetnx' command")
}
size := argCount / 2
keys := make([]string, size)
valueMap := make(map[string]string)
for i := 0; i < size; i++ {
keys[i] = string(cmdLine[2*i+1])
valueMap[keys[i]] = string(cmdLine[2*i+2])
}
groupMap := cluster.groupBy(keys)
if len(groupMap) == 1 && allowFastTransaction { // do fast
for peer := range groupMap {
return cluster.relay(peer, c, modifyCmd(cmdLine, "MSetNX_"))
}
}
// prepare procedure:
// 1. Normal tcc preparation (undo log and lock related keys)
// 2. Peer checks whether any key already exists, If so it will return keyExistsErr. Then coordinator will request rollback over all participated nodes
var errReply redis.Reply
txID := cluster.idGenerator.NextID()
txIDStr := strconv.FormatInt(txID, 10)
rollback := false
for node, group := range groupMap {
nodeArgs := []string{txIDStr, "MSETNX"}
for _, k := range group {
nodeArgs = append(nodeArgs, k, valueMap[k])
}
resp := cluster.relay(node, c, makeArgs("Prepare", nodeArgs...))
if protocol.IsErrorReply(resp) {
re := resp.(protocol.ErrorReply)
if re.Error() == keyExistsErr {
errReply = protocol.MakeIntReply(0)
} else {
errReply = resp
}
rollback = true
break
}
}
if rollback {
// rollback
requestRollback(cluster, c, txID, groupMap)
return errReply
}
_, errReply = requestCommit(cluster, c, txID, groupMap)
rollback = errReply != nil
if !rollback {
return protocol.MakeIntReply(1)
}
return errReply
}
func prepareMSetNx(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply {
args := cmdLine[1:]
if len(args)%2 != 0 {
return protocol.MakeSyntaxErrReply()
}
size := len(args) / 2
values := make([][]byte, size)
keys := make([]string, size)
for i := 0; i < size; i++ {
keys[i] = string(args[2*i])
values[i] = args[2*i+1]
}
re := cluster.db.ExecWithLock(conn, utils.ToCmdLine2("ExistIn", keys...))
if protocol.IsErrorReply(re) {
return re
}
_, ok := re.(*protocol.EmptyMultiBulkReply)
if !ok {
return protocol.MakeErrReply(keyExistsErr)
}
return protocol.MakeOkReply()
}
func init() {
registerPrepareFunc("MSetNx", prepareMSetNx)
}

View File

@@ -1,30 +0,0 @@
package cluster
import (
"github.com/hdt3213/godis/redis/connection"
"github.com/hdt3213/godis/redis/protocol/asserts"
"testing"
)
func TestMSet(t *testing.T) {
conn := connection.NewFakeConn()
allowFastTransaction = false
testNodeA := testCluster[0]
ret := MSet(testNodeA, conn, toArgs("MSET", "a", "a", "b", "b"))
asserts.AssertNotError(t, ret)
ret = testNodeA.Exec(conn, toArgs("MGET", "a", "b"))
asserts.AssertMultiBulkReply(t, ret, []string{"a", "b"})
}
func TestMSetNx(t *testing.T) {
conn := connection.NewFakeConn()
allowFastTransaction = false
testNodeA := testCluster[0]
FlushAll(testNodeA, conn, toArgs("FLUSHALL"))
ret := MSetNX(testNodeA, conn, toArgs("MSETNX", "a", "a", "b", "b"))
asserts.AssertNotError(t, ret)
ret = MSetNX(testNodeA, conn, toArgs("MSETNX", "a", "a", "c", "c"))
asserts.AssertNotError(t, ret)
ret = testNodeA.Exec(conn, toArgs("MGET", "a", "b", "c"))
asserts.AssertMultiBulkReply(t, ret, []string{"a", "b", ""})
}

View File

@@ -1,165 +0,0 @@
package cluster
import (
"github.com/hdt3213/godis/database"
"github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/connection"
"github.com/hdt3213/godis/redis/protocol"
"strconv"
)
const relayMulti = "multi_"
const innerWatch = "watch_"
var relayMultiBytes = []byte(relayMulti)
// cmdLine == []string{"exec"}
func execMulti(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply {
if !conn.InMultiState() {
return protocol.MakeErrReply("ERR EXEC without MULTI")
}
defer conn.SetMultiState(false)
cmdLines := conn.GetQueuedCmdLine()
// analysis related keys
keys := make([]string, 0) // may contains duplicate
for _, cl := range cmdLines {
wKeys, rKeys := database.GetRelatedKeys(cl)
keys = append(keys, wKeys...)
keys = append(keys, rKeys...)
}
watching := conn.GetWatching()
watchingKeys := make([]string, 0, len(watching))
for key := range watching {
watchingKeys = append(watchingKeys, key)
}
keys = append(keys, watchingKeys...)
if len(keys) == 0 {
// empty transaction or only `PING`s
return cluster.db.ExecMulti(conn, watching, cmdLines)
}
groupMap := cluster.groupBy(keys)
if len(groupMap) > 1 {
return protocol.MakeErrReply("ERR MULTI commands transaction must within one slot in cluster mode")
}
var peer string
// assert len(groupMap) == 1
for p := range groupMap {
peer = p
}
// out parser not support protocol.MultiRawReply, so we have to encode it
if peer == cluster.self {
for _, key := range keys {
if errReply := cluster.ensureKey(key); errReply != nil {
return errReply
}
}
return cluster.db.ExecMulti(conn, watching, cmdLines)
}
return execMultiOnOtherNode(cluster, conn, peer, watching, cmdLines)
}
func execMultiOnOtherNode(cluster *Cluster, conn redis.Connection, peer string, watching map[string]uint32, cmdLines []CmdLine) redis.Reply {
defer func() {
conn.ClearQueuedCmds()
conn.SetMultiState(false)
}()
relayCmdLine := [][]byte{ // relay it to executing node
relayMultiBytes,
}
// watching commands
var watchingCmdLine = utils.ToCmdLine(innerWatch)
for key, ver := range watching {
verStr := strconv.FormatUint(uint64(ver), 10)
watchingCmdLine = append(watchingCmdLine, []byte(key), []byte(verStr))
}
relayCmdLine = append(relayCmdLine, encodeCmdLine([]CmdLine{watchingCmdLine})...)
relayCmdLine = append(relayCmdLine, encodeCmdLine(cmdLines)...)
var rawRelayResult redis.Reply
rawRelayResult = cluster.relay(peer, connection.NewFakeConn(), relayCmdLine)
if protocol.IsErrorReply(rawRelayResult) {
return rawRelayResult
}
_, ok := rawRelayResult.(*protocol.EmptyMultiBulkReply)
if ok {
return rawRelayResult
}
relayResult, ok := rawRelayResult.(*protocol.MultiBulkReply)
if !ok {
return protocol.MakeErrReply("execute failed")
}
rep, err := parseEncodedMultiRawReply(relayResult.Args)
if err != nil {
return protocol.MakeErrReply(err.Error())
}
return rep
}
// execRelayedMulti execute relayed multi commands transaction
// cmdLine format: _multi watch-cmdLine base64ed-cmdLine
// result format: base64ed-protocol list
func execRelayedMulti(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply {
if len(cmdLine) < 2 {
return protocol.MakeArgNumErrReply("_exec")
}
decoded, err := parseEncodedMultiRawReply(cmdLine[1:])
if err != nil {
return protocol.MakeErrReply(err.Error())
}
var txCmdLines []CmdLine
for _, rep := range decoded.Replies {
mbr, ok := rep.(*protocol.MultiBulkReply)
if !ok {
return protocol.MakeErrReply("exec failed")
}
txCmdLines = append(txCmdLines, mbr.Args)
}
watching := make(map[string]uint32)
watchCmdLine := txCmdLines[0] // format: watch_ key1 ver1 key2 ver2...
for i := 2; i < len(watchCmdLine); i += 2 {
key := string(watchCmdLine[i-1])
verStr := string(watchCmdLine[i])
ver, err := strconv.ParseUint(verStr, 10, 64)
if err != nil {
return protocol.MakeErrReply("watching command line failed")
}
watching[key] = uint32(ver)
}
rawResult := cluster.db.ExecMulti(conn, watching, txCmdLines[1:])
_, ok := rawResult.(*protocol.EmptyMultiBulkReply)
if ok {
return rawResult
}
resultMBR, ok := rawResult.(*protocol.MultiRawReply)
if !ok {
return protocol.MakeErrReply("exec failed")
}
return encodeMultiRawReply(resultMBR)
}
func execWatch(cluster *Cluster, conn redis.Connection, args [][]byte) redis.Reply {
if len(args) < 2 {
return protocol.MakeArgNumErrReply("watch")
}
args = args[1:]
watching := conn.GetWatching()
for _, bkey := range args {
key := string(bkey)
err := cluster.ensureKey(key)
if err != nil {
return err
}
result := cluster.relayByKey(key, conn, utils.ToCmdLine("GetVer", key))
if protocol.IsErrorReply(result) {
return result
}
intResult, ok := result.(*protocol.IntReply)
if !ok {
return protocol.MakeErrReply("get version failed")
}
watching[key] = uint32(intResult.Code)
}
return protocol.MakeOkReply()
}

View File

@@ -1,48 +0,0 @@
package cluster
import (
"bytes"
"encoding/base64"
"github.com/hdt3213/godis/redis/parser"
"github.com/hdt3213/godis/redis/protocol"
)
func encodeCmdLine(cmdLines []CmdLine) [][]byte {
var result [][]byte
for _, line := range cmdLines {
raw := protocol.MakeMultiBulkReply(line).ToBytes()
encoded := make([]byte, base64.StdEncoding.EncodedLen(len(raw)))
base64.StdEncoding.Encode(encoded, raw)
result = append(result, encoded)
}
return result
}
func parseEncodedMultiRawReply(args [][]byte) (*protocol.MultiRawReply, error) {
cmdBuf := new(bytes.Buffer)
for _, arg := range args {
dbuf := make([]byte, base64.StdEncoding.DecodedLen(len(arg)))
n, err := base64.StdEncoding.Decode(dbuf, arg)
if err != nil {
continue
}
cmdBuf.Write(dbuf[:n])
}
cmds, err := parser.ParseBytes(cmdBuf.Bytes())
if err != nil {
return nil, protocol.MakeErrReply(err.Error())
}
return protocol.MakeMultiRawReply(cmds), nil
}
// todo: use multi raw reply instead of base64
func encodeMultiRawReply(src *protocol.MultiRawReply) *protocol.MultiBulkReply {
args := make([][]byte, 0, len(src.Replies))
for _, rep := range src.Replies {
raw := rep.ToBytes()
encoded := make([]byte, base64.StdEncoding.EncodedLen(len(raw)))
base64.StdEncoding.Encode(encoded, raw)
args = append(args, encoded)
}
return protocol.MakeMultiBulkReply(args)
}

View File

@@ -1,92 +0,0 @@
package cluster
import (
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/connection"
"github.com/hdt3213/godis/redis/protocol"
"github.com/hdt3213/godis/redis/protocol/asserts"
"testing"
)
func TestMultiExecOnSelf(t *testing.T) {
testNodeA := testCluster[0]
conn := new(connection.FakeConn)
testNodeA.db.Exec(conn, utils.ToCmdLine("FLUSHALL"))
result := testNodeA.Exec(conn, toArgs("MULTI"))
asserts.AssertNotError(t, result)
key := "{abc}" + utils.RandString(10)
value := utils.RandString(10)
testNodeA.Exec(conn, utils.ToCmdLine("set", key, value))
key2 := "{abc}" + utils.RandString(10)
testNodeA.Exec(conn, utils.ToCmdLine("rpush", key2, value))
result = testNodeA.Exec(conn, utils.ToCmdLine("exec"))
asserts.AssertNotError(t, result)
result = testNodeA.Exec(conn, utils.ToCmdLine("get", key))
asserts.AssertBulkReply(t, result, value)
result = testNodeA.Exec(conn, utils.ToCmdLine("lrange", key2, "0", "-1"))
asserts.AssertMultiBulkReply(t, result, []string{value})
}
func TestEmptyMulti(t *testing.T) {
testNodeA := testCluster[0]
conn := new(connection.FakeConn)
testNodeA.Exec(conn, utils.ToCmdLine("FLUSHALL"))
result := testNodeA.Exec(conn, toArgs("MULTI"))
asserts.AssertNotError(t, result)
result = testNodeA.Exec(conn, utils.ToCmdLine("GET", "a"))
asserts.AssertNotError(t, result)
result = testNodeA.Exec(conn, utils.ToCmdLine("EXEC"))
asserts.AssertNotError(t, result)
mbr := result.(*protocol.MultiRawReply)
asserts.AssertNullBulk(t, mbr.Replies[0])
}
func TestMultiExecOnOthers(t *testing.T) {
testNodeA := testCluster[0]
conn := new(connection.FakeConn)
testNodeA.Exec(conn, utils.ToCmdLine("FLUSHALL"))
result := testNodeA.Exec(conn, toArgs("MULTI"))
asserts.AssertNotError(t, result)
key := utils.RandString(10)
value := utils.RandString(10)
testNodeA.Exec(conn, utils.ToCmdLine("rpush", key, value))
testNodeA.Exec(conn, utils.ToCmdLine("lrange", key, "0", "-1"))
cmdLines := conn.GetQueuedCmdLine()
rawResp := execMultiOnOtherNode(testNodeA, conn, testNodeA.self, nil, cmdLines)
rep := rawResp.(*protocol.MultiRawReply)
if len(rep.Replies) != 2 {
t.Errorf("expect 2 replies actual %d", len(rep.Replies))
}
asserts.AssertMultiBulkReply(t, rep.Replies[1], []string{value})
}
func TestWatch(t *testing.T) {
testNodeA := testCluster[0]
for i := 0; i < 10; i++ {
conn := new(connection.FakeConn)
key := "{1}" + utils.RandString(10)
key2 := "{1}" + utils.RandString(10) // use hash tag to ensure same slot
value := utils.RandString(10)
testNodeA.Exec(conn, utils.ToCmdLine("FLUSHALL"))
testNodeA.Exec(conn, utils.ToCmdLine("watch", key))
testNodeA.Exec(conn, utils.ToCmdLine("set", key, value))
result := testNodeA.Exec(conn, toArgs("MULTI"))
asserts.AssertNotError(t, result)
value2 := utils.RandString(10)
testNodeA.Exec(conn, utils.ToCmdLine("set", key2, value2))
result = testNodeA.Exec(conn, utils.ToCmdLine("exec"))
asserts.AssertNotError(t, result)
result = testNodeA.Exec(conn, utils.ToCmdLine("get", key2))
asserts.AssertNullBulk(t, result)
testNodeA.Exec(conn, utils.ToCmdLine("watch", key))
result = testNodeA.Exec(conn, toArgs("MULTI"))
asserts.AssertNotError(t, result)
testNodeA.Exec(conn, utils.ToCmdLine("set", key2, value2))
result = testNodeA.Exec(conn, utils.ToCmdLine("exec"))
asserts.AssertNotError(t, result)
result = testNodeA.Exec(conn, utils.ToCmdLine("get", key2))
asserts.AssertBulkReply(t, result, value2)
}
}

View File

@@ -1,35 +0,0 @@
package cluster
import (
"github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/lib/logger"
"github.com/hdt3213/godis/redis/protocol"
)
const (
relayPublish = "publish_"
)
// Publish broadcasts msg to all peers in cluster when receive publish command from client
func Publish(cluster *Cluster, c redis.Connection, cmdLine [][]byte) redis.Reply {
var count int64 = 0
results := cluster.broadcast(c, modifyCmd(cmdLine, relayPublish))
for _, val := range results {
if errReply, ok := val.(protocol.ErrorReply); ok {
logger.Error("publish occurs error: " + errReply.Error())
} else if intReply, ok := val.(*protocol.IntReply); ok {
count += intReply.Code
}
}
return protocol.MakeIntReply(count)
}
// Subscribe puts the given connection into the given channel
func Subscribe(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
return cluster.db.Exec(c, args) // let local db.hub handle subscribe
}
// UnSubscribe removes the given connection from the given channel
func UnSubscribe(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
return cluster.db.Exec(c, args) // let local db.hub handle subscribe
}

View File

@@ -1,49 +0,0 @@
package cluster
import (
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/connection"
"github.com/hdt3213/godis/redis/parser"
"github.com/hdt3213/godis/redis/protocol/asserts"
"testing"
)
func TestPublish(t *testing.T) {
testNodeA := testCluster[0]
channel := utils.RandString(5)
msg := utils.RandString(5)
conn := connection.NewFakeConn()
Subscribe(testNodeA, conn, utils.ToCmdLine("SUBSCRIBE", channel))
conn.Clean() // clean subscribe success
Publish(testNodeA, conn, utils.ToCmdLine("PUBLISH", channel, msg))
data := conn.Bytes()
ret, err := parser.ParseOne(data)
if err != nil {
t.Error(err)
return
}
asserts.AssertMultiBulkReply(t, ret, []string{
"message",
channel,
msg,
})
// unsubscribe
UnSubscribe(testNodeA, conn, utils.ToCmdLine("UNSUBSCRIBE", channel))
conn.Clean()
Publish(testNodeA, conn, utils.ToCmdLine("PUBLISH", channel, msg))
data = conn.Bytes()
if len(data) > 0 {
t.Error("expect no msg")
}
// unsubscribe all
Subscribe(testNodeA, conn, utils.ToCmdLine("SUBSCRIBE", channel))
UnSubscribe(testNodeA, conn, utils.ToCmdLine("UNSUBSCRIBE"))
conn.Clean()
Publish(testNodeA, conn, utils.ToCmdLine("PUBLISH", channel, msg))
data = conn.Bytes()
if len(data) > 0 {
t.Error("expect no msg")
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -187,3 +187,8 @@ func (node *Node) Propose(event *LogEntry) (uint64, error) {
}
return future.Index(), nil
}
func (node *Node) Close() error {
future := node.inner.Shutdown()
return fmt.Errorf("raft shutdown %v", future.Error())
}

View File

@@ -1,134 +0,0 @@
package cluster
// raft event handlers
import (
"errors"
"github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/lib/logger"
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/connection"
"github.com/hdt3213/godis/redis/protocol"
)
const (
eventNewNode = iota + 1
eventSetSlot
)
// invoker should provide with raft.mu lock
func (raft *Raft) applyLogEntries(entries []*logEntry) {
for _, entry := range entries {
switch entry.Event {
case eventNewNode:
node := &Node{
ID: entry.NodeID,
Addr: entry.Addr,
}
raft.nodes[node.ID] = node
if raft.state == leader {
raft.nodeIndexMap[entry.NodeID] = &nodeStatus{
receivedIndex: entry.Index, // the new node should not receive its own join event
}
}
case eventSetSlot:
for _, slotID := range entry.SlotIDs {
slot := raft.slots[slotID]
oldNode := raft.nodes[slot.NodeID]
// remove from old oldNode
for i, s := range oldNode.Slots {
if s.ID == slot.ID {
copy(oldNode.Slots[i:], oldNode.Slots[i+1:])
oldNode.Slots = oldNode.Slots[:len(oldNode.Slots)-1]
break
}
}
newNodeID := entry.NodeID
slot.NodeID = newNodeID
// fixme: 多个节点同时加入后 re balance 时 newNode 可能为 nil
newNode := raft.nodes[slot.NodeID]
newNode.Slots = append(newNode.Slots, slot)
}
}
}
if err := raft.persist(); err != nil {
logger.Errorf("persist raft error: %v", err)
}
}
// NewNode creates a new Node when a node request self node for joining cluster
func (raft *Raft) NewNode(addr string) (*Node, error) {
if _, ok := raft.nodes[addr]; ok {
return nil, errors.New("node existed")
}
node := &Node{
ID: addr,
Addr: addr,
}
raft.nodes[node.ID] = node
proposal := &logEntry{
Event: eventNewNode,
NodeID: node.ID,
Addr: node.Addr,
}
conn := connection.NewFakeConn()
resp := raft.cluster.relay(raft.leaderId, conn,
utils.ToCmdLine("raft", "propose", string(proposal.marshal())))
if err, ok := resp.(protocol.ErrorReply); ok {
return nil, err
}
return node, nil
}
// SetSlot propose
func (raft *Raft) SetSlot(slotIDs []uint32, newNodeID string) protocol.ErrorReply {
proposal := &logEntry{
Event: eventSetSlot,
NodeID: newNodeID,
SlotIDs: slotIDs,
}
conn := connection.NewFakeConn()
resp := raft.cluster.relay(raft.leaderId, conn,
utils.ToCmdLine("raft", "propose", string(proposal.marshal())))
if err, ok := resp.(protocol.ErrorReply); ok {
return err
}
return nil
}
// execRaftJoin handles requests from a new node to join raft group, current node should be leader
// command line: raft join addr
func execRaftJoin(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
if len(args) != 1 {
return protocol.MakeArgNumErrReply("raft join")
}
raft := cluster.asRaft()
if raft.state != leader {
leaderNode := raft.nodes[raft.leaderId]
return protocol.MakeErrReply("NOT LEADER " + leaderNode.ID + " " + leaderNode.Addr)
}
addr := string(args[0])
nodeID := addr
raft.mu.RLock()
_, exist := raft.nodes[addr]
raft.mu.RUnlock()
// if node has joint cluster but terminated before persisting cluster config,
// it may try to join at next start.
// In this case, we only have to send a snapshot for it
if !exist {
proposal := &logEntry{
Event: eventNewNode,
NodeID: nodeID,
Addr: addr,
}
if err := raft.propose(proposal); err != nil {
return err
}
}
raft.mu.RLock()
snapshot := raft.makeSnapshotForFollower(nodeID)
raft.mu.RUnlock()
return protocol.MakeMultiBulkReply(snapshot)
}

View File

@@ -1,204 +0,0 @@
package cluster
import (
"encoding/json"
"fmt"
"github.com/hdt3213/godis/redis/protocol"
"sort"
"strconv"
"strings"
)
// marshalSlotIds serializes slot ids
// For example, 1, 2, 3, 5, 7, 8 -> 1-3, 5, 7-8
func marshalSlotIds(slots []*Slot) []string {
sort.Slice(slots, func(i, j int) bool {
return slots[i].ID < slots[j].ID
})
// find continuous scopes
var scopes [][]uint32
buf := make([]uint32, 2)
var scope []uint32
for i, slot := range slots {
if len(scope) == 0 { // outside scope
if i+1 < len(slots) &&
slots[i+1].ID == slot.ID+1 { // if continuous, then start one
scope = buf
scope[0] = slot.ID
} else { // discrete number
scopes = append(scopes, []uint32{slot.ID})
}
} else { // within a scope
if i == len(slots)-1 || slots[i+1].ID != slot.ID+1 { // reach end or not continuous, stop current scope
scope[1] = slot.ID
scopes = append(scopes, []uint32{scope[0], scope[1]})
scope = nil
}
}
}
// marshal scopes
result := make([]string, 0, len(scopes))
for _, scope := range scopes {
if len(scope) == 1 {
s := strconv.Itoa(int(scope[0]))
result = append(result, s)
} else { // assert len(scope) == 2
beg := strconv.Itoa(int(scope[0]))
end := strconv.Itoa(int(scope[1]))
result = append(result, beg+"-"+end)
}
}
return result
}
// unmarshalSlotIds deserializes lines generated by marshalSlotIds
func unmarshalSlotIds(args []string) ([]uint32, error) {
var result []uint32
for i, line := range args {
if pivot := strings.IndexByte(line, '-'); pivot > 0 {
// line is a scope
beg, err := strconv.Atoi(line[:pivot])
if err != nil {
return nil, fmt.Errorf("illegal at slot line %d", i+1)
}
end, err := strconv.Atoi(line[pivot+1:])
if err != nil {
return nil, fmt.Errorf("illegal at slot line %d", i+1)
}
for j := beg; j <= end; j++ {
result = append(result, uint32(j))
}
} else {
// line is a number
v, err := strconv.Atoi(line)
if err != nil {
return nil, fmt.Errorf("illegal at slot line %d", i)
}
result = append(result, uint32(v))
}
}
return result, nil
}
type nodePayload struct {
ID string `json:"id"`
Addr string `json:"addr"`
SlotDesc []string `json:"slotDesc"`
Flags uint32 `json:"flags"`
}
func marshalNodes(nodes map[string]*Node) [][]byte {
var args [][]byte
for _, node := range nodes {
slotLines := marshalSlotIds(node.Slots)
payload := &nodePayload{
ID: node.ID,
Addr: node.Addr,
SlotDesc: slotLines,
Flags: node.Flags,
}
bin, _ := json.Marshal(payload)
args = append(args, bin)
}
return args
}
func unmarshalNodes(args [][]byte) (map[string]*Node, error) {
nodeMap := make(map[string]*Node)
for i, bin := range args {
payload := &nodePayload{}
err := json.Unmarshal(bin, payload)
if err != nil {
return nil, fmt.Errorf("unmarshal node failed at line %d: %v", i+1, err)
}
slotIds, err := unmarshalSlotIds(payload.SlotDesc)
if err != nil {
return nil, err
}
node := &Node{
ID: payload.ID,
Addr: payload.Addr,
Flags: payload.Flags,
}
for _, slotId := range slotIds {
node.Slots = append(node.Slots, &Slot{
ID: slotId,
NodeID: node.ID,
Flags: 0,
})
}
nodeMap[node.ID] = node
}
return nodeMap, nil
}
// genSnapshot
// invoker provide lock
func (raft *Raft) makeSnapshot() [][]byte {
topology := marshalNodes(raft.nodes)
snapshot := [][]byte{
[]byte(raft.selfNodeID),
[]byte(strconv.Itoa(int(raft.state))),
[]byte(raft.leaderId),
[]byte(strconv.Itoa(raft.term)),
[]byte(strconv.Itoa(raft.committedIndex)),
}
snapshot = append(snapshot, topology...)
return snapshot
}
// makeSnapshotForFollower used by leader node to generate snapshot for follower
// invoker provide with lock
func (raft *Raft) makeSnapshotForFollower(followerId string) [][]byte {
snapshot := raft.makeSnapshot()
snapshot[0] = []byte(followerId)
snapshot[1] = []byte(strconv.Itoa(int(follower)))
return snapshot
}
// invoker provide with lock
func (raft *Raft) loadSnapshot(snapshot [][]byte) protocol.ErrorReply {
// make sure raft.slots and node.Slots is the same object
selfNodeId := string(snapshot[0])
state0, err := strconv.Atoi(string(snapshot[1]))
if err != nil {
return protocol.MakeErrReply("illegal state: " + string(snapshot[1]))
}
state := raftState(state0)
if _, ok := stateNames[state]; !ok {
return protocol.MakeErrReply("unknown state: " + strconv.Itoa(int(state)))
}
leaderId := string(snapshot[2])
term, err := strconv.Atoi(string(snapshot[3]))
if err != nil {
return protocol.MakeErrReply("illegal term: " + string(snapshot[3]))
}
commitIndex, err := strconv.Atoi(string(snapshot[4]))
if err != nil {
return protocol.MakeErrReply("illegal commit index: " + string(snapshot[3]))
}
nodes, err := unmarshalNodes(snapshot[5:])
if err != nil {
return protocol.MakeErrReply(err.Error())
}
raft.selfNodeID = selfNodeId
raft.state = state
raft.leaderId = leaderId
raft.term = term
raft.committedIndex = commitIndex
raft.proposedIndex = commitIndex
raft.initLog(term, commitIndex, nil)
raft.slots = make([]*Slot, slotCount)
for _, node := range nodes {
for _, slot := range node.Slots {
raft.slots[int(slot.ID)] = slot
}
if node.getState() == leader {
raft.leaderId = node.ID
}
}
raft.nodes = nodes
return nil
}

View File

@@ -1 +0,0 @@
package cluster

View File

@@ -1,140 +0,0 @@
package cluster
import (
"github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/protocol"
"strconv"
)
// Rename renames a key, the origin and the destination must within the same node
func Rename(cluster *Cluster, c redis.Connection, cmdLine [][]byte) redis.Reply {
if len(cmdLine) != 3 {
return protocol.MakeErrReply("ERR wrong number of arguments for 'rename' command")
}
srcKey := string(cmdLine[1])
destKey := string(cmdLine[2])
srcNode := cluster.pickNodeAddrByKey(srcKey)
destNode := cluster.pickNodeAddrByKey(destKey)
if srcNode == destNode { // do fast
return cluster.relay(srcNode, c, modifyCmd(cmdLine, "Rename_"))
}
groupMap := map[string][]string{
srcNode: {srcKey},
destNode: {destKey},
}
txID := cluster.idGenerator.NextID()
txIDStr := strconv.FormatInt(txID, 10)
// prepare rename from
srcPrepareResp := cluster.relay(srcNode, c, makeArgs("Prepare", txIDStr, "RenameFrom", srcKey))
if protocol.IsErrorReply(srcPrepareResp) {
// rollback src node
requestRollback(cluster, c, txID, map[string][]string{srcNode: {srcKey}})
return srcPrepareResp
}
srcPrepareMBR, ok := srcPrepareResp.(*protocol.MultiBulkReply)
if !ok || len(srcPrepareMBR.Args) < 2 {
requestRollback(cluster, c, txID, map[string][]string{srcNode: {srcKey}})
return protocol.MakeErrReply("ERR invalid prepare response")
}
// prepare rename to
destPrepareResp := cluster.relay(destNode, c, utils.ToCmdLine3("Prepare", []byte(txIDStr),
[]byte("RenameTo"), []byte(destKey), srcPrepareMBR.Args[0], srcPrepareMBR.Args[1]))
if protocol.IsErrorReply(destPrepareResp) {
// rollback src node
requestRollback(cluster, c, txID, groupMap)
return destPrepareResp
}
if _, errReply := requestCommit(cluster, c, txID, groupMap); errReply != nil {
requestRollback(cluster, c, txID, groupMap)
return errReply
}
return protocol.MakeOkReply()
}
// prepareRenameFrom is prepare-function for RenameFrom, see prepareFuncMap
func prepareRenameFrom(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply {
if len(cmdLine) != 2 {
return protocol.MakeArgNumErrReply("RenameFrom")
}
key := string(cmdLine[1])
existResp := cluster.db.ExecWithLock(conn, utils.ToCmdLine("Exists", key))
if protocol.IsErrorReply(existResp) {
return existResp
}
existIntResp := existResp.(*protocol.IntReply)
if existIntResp.Code == 0 {
return protocol.MakeErrReply("ERR no such key")
}
return cluster.db.ExecWithLock(conn, utils.ToCmdLine2("DumpKey", key))
}
func prepareRenameNxTo(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply {
if len(cmdLine) != 4 {
return protocol.MakeArgNumErrReply("RenameNxTo")
}
key := string(cmdLine[1])
existResp := cluster.db.ExecWithLock(conn, utils.ToCmdLine("Exists", key))
if protocol.IsErrorReply(existResp) {
return existResp
}
existIntResp := existResp.(*protocol.IntReply)
if existIntResp.Code == 1 {
return protocol.MakeErrReply(keyExistsErr)
}
return protocol.MakeOkReply()
}
func init() {
registerPrepareFunc("RenameFrom", prepareRenameFrom)
registerPrepareFunc("RenameNxTo", prepareRenameNxTo)
}
// RenameNx renames a key, only if the new key does not exist.
// The origin and the destination must within the same node
func RenameNx(cluster *Cluster, c redis.Connection, cmdLine [][]byte) redis.Reply {
if len(cmdLine) != 3 {
return protocol.MakeErrReply("ERR wrong number of arguments for 'renamenx' command")
}
srcKey := string(cmdLine[1])
destKey := string(cmdLine[2])
srcNode := cluster.pickNodeAddrByKey(srcKey)
destNode := cluster.pickNodeAddrByKey(destKey)
if srcNode == destNode {
return cluster.relay(srcNode, c, modifyCmd(cmdLine, "RenameNX_"))
}
groupMap := map[string][]string{
srcNode: {srcKey},
destNode: {destKey},
}
txID := cluster.idGenerator.NextID()
txIDStr := strconv.FormatInt(txID, 10)
// prepare rename from
srcPrepareResp := cluster.relay(srcNode, c, makeArgs("Prepare", txIDStr, "RenameFrom", srcKey))
if protocol.IsErrorReply(srcPrepareResp) {
// rollback src node
requestRollback(cluster, c, txID, map[string][]string{srcNode: {srcKey}})
return srcPrepareResp
}
srcPrepareMBR, ok := srcPrepareResp.(*protocol.MultiBulkReply)
if !ok || len(srcPrepareMBR.Args) < 2 {
requestRollback(cluster, c, txID, map[string][]string{srcNode: {srcKey}})
return protocol.MakeErrReply("ERR invalid prepare response")
}
// prepare rename to
destPrepareResp := cluster.relay(destNode, c, utils.ToCmdLine3("Prepare", []byte(txIDStr),
[]byte("RenameNxTo"), []byte(destKey), srcPrepareMBR.Args[0], srcPrepareMBR.Args[1]))
if protocol.IsErrorReply(destPrepareResp) {
// rollback src node
requestRollback(cluster, c, txID, groupMap)
if re := destPrepareResp.(protocol.ErrorReply); re.Error() == keyExistsErr {
return protocol.MakeIntReply(0)
}
return destPrepareResp
}
if _, errReply := requestCommit(cluster, c, txID, groupMap); errReply != nil {
requestRollback(cluster, c, txID, groupMap)
return errReply
}
return protocol.MakeIntReply(1)
}

View File

@@ -1,59 +0,0 @@
package cluster
import (
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/connection"
"github.com/hdt3213/godis/redis/protocol/asserts"
"testing"
)
func TestRename(t *testing.T) {
testNodeA := testCluster[0]
conn := new(connection.FakeConn)
// cross node rename
for i := 0; i < 10; i++ {
testNodeA.Exec(conn, utils.ToCmdLine("FlushALL"))
key := utils.RandString(10)
value := utils.RandString(10)
newKey := utils.RandString(10)
testNodeA.Exec(conn, utils.ToCmdLine("SET", key, value, "ex", "100000"))
result := testNodeA.Exec(conn, utils.ToCmdLine("RENAME", key, newKey))
asserts.AssertStatusReply(t, result, "OK")
result = testNodeA.Exec(conn, utils.ToCmdLine("EXISTS", key))
asserts.AssertIntReply(t, result, 0)
result = testNodeA.Exec(conn, utils.ToCmdLine("EXISTS", newKey))
asserts.AssertIntReply(t, result, 1)
result = testNodeA.Exec(conn, utils.ToCmdLine("TTL", newKey))
asserts.AssertIntReplyGreaterThan(t, result, 0)
}
}
func TestRenameNx(t *testing.T) {
testNodeA := testCluster[0]
conn := new(connection.FakeConn)
// cross node rename
for i := 0; i < 10; i++ {
testNodeA.Exec(conn, utils.ToCmdLine("FlushALL"))
key := utils.RandString(10)
value := utils.RandString(10)
newKey := utils.RandString(10)
testNodeA.Exec(conn, utils.ToCmdLine("SET", key, value, "ex", "100000"))
result := testNodeA.Exec(conn, utils.ToCmdLine("RENAMENX", key, newKey))
asserts.AssertIntReply(t, result, 1)
result = testNodeA.Exec(conn, utils.ToCmdLine("EXISTS", key))
asserts.AssertIntReply(t, result, 0)
result = testNodeA.Exec(conn, utils.ToCmdLine("EXISTS", newKey))
asserts.AssertIntReply(t, result, 1)
result = testNodeA.Exec(conn, utils.ToCmdLine("TTL", newKey))
asserts.AssertIntReplyGreaterThan(t, result, 0)
value2 := value + "ccc"
testNodeA.Exec(conn, utils.ToCmdLine("SET", key, value2, "ex", "100000"))
result = testNodeA.Exec(conn, utils.ToCmdLine("RENAMENX", key, newKey))
asserts.AssertIntReply(t, result, 0)
result = testNodeA.Exec(conn, utils.ToCmdLine("EXISTS", key))
asserts.AssertIntReply(t, result, 1)
}
}

View File

@@ -1,166 +0,0 @@
package cluster
import (
"github.com/hdt3213/godis/interface/redis"
"strings"
)
// CmdLine is alias for [][]byte, represents a command line
type CmdLine = [][]byte
var router = make(map[string]CmdFunc)
func registerCmd(name string, cmd CmdFunc) {
name = strings.ToLower(name)
router[name] = cmd
}
func registerDefaultCmd(name string) {
registerCmd(name, defaultFunc)
}
// relay command to responsible peer, and return its protocol to client
func defaultFunc(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
key := string(args[1])
slotId := getSlot(key)
peer := cluster.pickNode(slotId)
if peer.ID == cluster.self {
err := cluster.ensureKeyWithoutLock(key)
if err != nil {
return err
}
// to self db
//return cluster.db.Exec(c, cmdLine)
return cluster.db.Exec(c, args)
}
return cluster.relay(peer.ID, c, args)
}
func init() {
registerCmd("Ping", ping)
registerCmd("Prepare", execPrepare)
registerCmd("Commit", execCommit)
registerCmd("Rollback", execRollback)
registerCmd("Del", Del)
registerCmd("Rename", Rename)
registerCmd("RenameNx", RenameNx)
registerCmd("Copy", Copy)
registerCmd("MSet", MSet)
registerCmd("MGet", MGet)
registerCmd("MSetNx", MSetNX)
registerCmd("Publish", Publish)
registerCmd("Subscribe", Subscribe)
registerCmd("Unsubscribe", UnSubscribe)
registerCmd("FlushDB", FlushDB)
registerCmd("FlushAll", FlushAll)
registerCmd(relayMulti, execRelayedMulti)
registerCmd("Watch", execWatch)
registerCmd("FlushDB_", genPenetratingExecutor("FlushDB"))
registerCmd("Copy_", genPenetratingExecutor("Copy"))
registerCmd("Watch_", genPenetratingExecutor("Watch"))
registerCmd(relayPublish, genPenetratingExecutor("Publish"))
registerCmd("Del_", genPenetratingExecutor("Del"))
registerCmd("MSet_", genPenetratingExecutor("MSet"))
registerCmd("MSetNx_", genPenetratingExecutor("MSetNx"))
registerCmd("MGet_", genPenetratingExecutor("MGet"))
registerCmd("Rename_", genPenetratingExecutor("Rename"))
registerCmd("RenameNx_", genPenetratingExecutor("RenameNx"))
registerCmd("DumpKey_", genPenetratingExecutor("DumpKey"))
defaultCmds := []string{
"expire",
"expireAt",
"pExpire",
"pExpireAt",
"ttl",
"PTtl",
"persist",
"exists",
"type",
"set",
"setNx",
"setEx",
"pSetEx",
"get",
"getEx",
"getSet",
"getDel",
"incr",
"incrBy",
"incrByFloat",
"decr",
"decrBy",
"lPush",
"lPushX",
"rPush",
"rPushX",
"LPop",
"RPop",
"LRem",
"LLen",
"LIndex",
"LSet",
"LRange",
"HSet",
"HSetNx",
"HGet",
"HExists",
"HDel",
"HLen",
"HStrLen",
"HMGet",
"HMSet",
"HKeys",
"HVals",
"HGetAll",
"HIncrBy",
"HIncrByFloat",
"HRandField",
"SAdd",
"SIsMember",
"SRem",
"SPop",
"SCard",
"SMembers",
"SInter",
"SInterStore",
"SUnion",
"SUnionStore",
"SDiff",
"SDiffStore",
"SRandMember",
"ZAdd",
"ZScore",
"ZIncrBy",
"ZRank",
"ZCount",
"ZRevRank",
"ZCard",
"ZRange",
"ZRevRange",
"ZRangeByScore",
"ZRevRangeByScore",
"ZRem",
"ZRemRangeByScore",
"ZRemRangeByRank",
"GeoAdd",
"GeoPos",
"GeoDist",
"GeoHash",
"GeoRadius",
"GeoRadiusByMember",
"GetVer",
"DumpKey",
}
for _, name := range defaultCmds {
registerDefaultCmd(name)
}
}
// genPenetratingExecutor generates an executor that can reach directly to the database layer
func genPenetratingExecutor(realCmd string) CmdFunc {
return func(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
return cluster.db.Exec(c, modifyCmd(cmdLine, realCmd))
}
}

View File

@@ -1,253 +0,0 @@
package cluster
import (
"fmt"
"github.com/hdt3213/godis/database"
"github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/lib/logger"
"github.com/hdt3213/godis/lib/timewheel"
"github.com/hdt3213/godis/redis/protocol"
"strconv"
"strings"
"sync"
"time"
)
// prepareFunc executed after related key locked, and use additional logic to determine whether the transaction can be committed
// For example, prepareMSetNX will return error to prevent MSetNx transaction from committing if any related key already exists
var prepareFuncMap = make(map[string]CmdFunc)
func registerPrepareFunc(cmdName string, fn CmdFunc) {
prepareFuncMap[strings.ToLower(cmdName)] = fn
}
// Transaction stores state and data for a try-commit-catch distributed transaction
type Transaction struct {
id string // transaction id
cmdLine [][]byte // cmd cmdLine
cluster *Cluster
conn redis.Connection
dbIndex int
writeKeys []string
readKeys []string
keysLocked bool
undoLog []CmdLine
status int8
mu *sync.Mutex
}
const (
maxLockTime = 3 * time.Second
waitBeforeCleanTx = 2 * maxLockTime
createdStatus = 0
preparedStatus = 1
committedStatus = 2
rolledBackStatus = 3
)
func genTaskKey(txID string) string {
return "tx:" + txID
}
// NewTransaction creates a try-commit-catch distributed transaction
func NewTransaction(cluster *Cluster, c redis.Connection, id string, cmdLine [][]byte) *Transaction {
return &Transaction{
id: id,
cmdLine: cmdLine,
cluster: cluster,
conn: c,
dbIndex: c.GetDBIndex(),
status: createdStatus,
mu: new(sync.Mutex),
}
}
// Reentrant
// invoker should hold tx.mu
func (tx *Transaction) lockKeys() {
if !tx.keysLocked {
tx.cluster.db.RWLocks(tx.dbIndex, tx.writeKeys, tx.readKeys)
tx.keysLocked = true
}
}
func (tx *Transaction) unLockKeys() {
if tx.keysLocked {
tx.cluster.db.RWUnLocks(tx.dbIndex, tx.writeKeys, tx.readKeys)
tx.keysLocked = false
}
}
// t should contain Keys and ID field
func (tx *Transaction) prepare() error {
tx.mu.Lock()
defer tx.mu.Unlock()
tx.writeKeys, tx.readKeys = database.GetRelatedKeys(tx.cmdLine)
// lock writeKeys
tx.lockKeys()
for _, key := range tx.writeKeys {
err := tx.cluster.ensureKey(key)
if err != nil {
return err
}
}
for _, key := range tx.readKeys {
err := tx.cluster.ensureKey(key)
if err != nil {
return err
}
}
// build undoLog
tx.undoLog = tx.cluster.db.GetUndoLogs(tx.dbIndex, tx.cmdLine)
tx.status = preparedStatus
taskKey := genTaskKey(tx.id)
timewheel.Delay(maxLockTime, taskKey, func() {
if tx.status == preparedStatus { // rollback transaction uncommitted until expire
logger.Info("abort transaction: " + tx.id)
tx.mu.Lock()
defer tx.mu.Unlock()
_ = tx.rollbackWithLock()
}
})
return nil
}
func (tx *Transaction) rollbackWithLock() error {
curStatus := tx.status
if tx.status != curStatus { // ensure status not changed by other goroutine
return fmt.Errorf("tx %s status changed", tx.id)
}
if tx.status == rolledBackStatus { // no need to rollback a rolled-back transaction
return nil
}
tx.lockKeys()
for _, cmdLine := range tx.undoLog {
tx.cluster.db.ExecWithLock(tx.conn, cmdLine)
}
tx.unLockKeys()
tx.status = rolledBackStatus
return nil
}
// cmdLine: Prepare id cmdName args...
func execPrepare(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
if len(cmdLine) < 3 {
return protocol.MakeErrReply("ERR wrong number of arguments for 'prepare' command")
}
txID := string(cmdLine[1])
cmdName := strings.ToLower(string(cmdLine[2]))
tx := NewTransaction(cluster, c, txID, cmdLine[2:])
cluster.transactionMu.Lock()
cluster.transactions.Put(txID, tx)
cluster.transactionMu.Unlock()
err := tx.prepare()
if err != nil {
return protocol.MakeErrReply(err.Error())
}
prepareFunc, ok := prepareFuncMap[cmdName]
if ok {
return prepareFunc(cluster, c, cmdLine[2:])
}
return &protocol.OkReply{}
}
// execRollback rollbacks local transaction
func execRollback(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
if len(cmdLine) != 2 {
return protocol.MakeErrReply("ERR wrong number of arguments for 'rollback' command")
}
txID := string(cmdLine[1])
cluster.transactionMu.RLock()
raw, ok := cluster.transactions.Get(txID)
cluster.transactionMu.RUnlock()
if !ok {
return protocol.MakeIntReply(0)
}
tx, _ := raw.(*Transaction)
tx.mu.Lock()
defer tx.mu.Unlock()
err := tx.rollbackWithLock()
if err != nil {
return protocol.MakeErrReply(err.Error())
}
// clean transaction
timewheel.Delay(waitBeforeCleanTx, "", func() {
cluster.transactionMu.Lock()
cluster.transactions.Remove(tx.id)
cluster.transactionMu.Unlock()
})
return protocol.MakeIntReply(1)
}
// execCommit commits local transaction as a worker when receive execCommit command from coordinator
func execCommit(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
if len(cmdLine) != 2 {
return protocol.MakeErrReply("ERR wrong number of arguments for 'commit' command")
}
txID := string(cmdLine[1])
cluster.transactionMu.RLock()
raw, ok := cluster.transactions.Get(txID)
cluster.transactionMu.RUnlock()
if !ok {
return protocol.MakeIntReply(0)
}
tx, _ := raw.(*Transaction)
tx.mu.Lock()
defer tx.mu.Unlock()
result := cluster.db.ExecWithLock(c, tx.cmdLine)
if protocol.IsErrorReply(result) {
// failed
err2 := tx.rollbackWithLock()
return protocol.MakeErrReply(fmt.Sprintf("err occurs when rollback: %v, origin err: %s", err2, result))
}
// after committed
tx.unLockKeys()
tx.status = committedStatus
// clean finished transaction
// do not clean immediately, in case rollback
timewheel.Delay(waitBeforeCleanTx, "", func() {
cluster.transactionMu.Lock()
cluster.transactions.Remove(tx.id)
cluster.transactionMu.Unlock()
})
return result
}
// requestCommit commands all node to commit transaction as coordinator
func requestCommit(cluster *Cluster, c redis.Connection, txID int64, groupMap map[string][]string) ([]redis.Reply, protocol.ErrorReply) {
var errReply protocol.ErrorReply
txIDStr := strconv.FormatInt(txID, 10)
respList := make([]redis.Reply, 0, len(groupMap))
for node := range groupMap {
resp := cluster.relay(node, c, makeArgs("commit", txIDStr))
if protocol.IsErrorReply(resp) {
errReply = resp.(protocol.ErrorReply)
break
}
respList = append(respList, resp)
}
if errReply != nil {
requestRollback(cluster, c, txID, groupMap)
return nil, errReply
}
return respList, nil
}
// requestRollback requests all node rollback transaction as coordinator
// groupMap: node -> keys
func requestRollback(cluster *Cluster, c redis.Connection, txID int64, groupMap map[string][]string) {
txIDStr := strconv.FormatInt(txID, 10)
for node := range groupMap {
cluster.relay(node, c, makeArgs("rollback", txIDStr))
}
}

View File

@@ -1,50 +0,0 @@
package cluster
import (
"github.com/hdt3213/godis/redis/connection"
"github.com/hdt3213/godis/redis/protocol/asserts"
"math/rand"
"strconv"
"testing"
)
func TestRollback(t *testing.T) {
// rollback uncommitted transaction
testNodeA := testCluster[0]
conn := new(connection.FakeConn)
FlushAll(testNodeA, conn, toArgs("FLUSHALL"))
txID := rand.Int63()
txIDStr := strconv.FormatInt(txID, 10)
keys := []string{"a", "{a}1"}
groupMap := map[string][]string{
testNodeA.self: keys,
}
args := []string{txIDStr, "DEL"}
args = append(args, keys...)
testNodeA.db.Exec(conn, toArgs("SET", "a", "a"))
ret := execPrepare(testNodeA, conn, makeArgs("Prepare", args...))
asserts.AssertNotError(t, ret)
requestRollback(testNodeA, conn, txID, groupMap)
ret = testNodeA.db.Exec(conn, toArgs("GET", "a"))
asserts.AssertBulkReply(t, ret, "a")
// rollback committed transaction
FlushAll(testNodeA, conn, toArgs("FLUSHALL"))
testNodeA.db.Exec(conn, toArgs("SET", "a", "a"))
txID = rand.Int63()
txIDStr = strconv.FormatInt(txID, 10)
args = []string{txIDStr, "DEL"}
args = append(args, keys...)
ret = execPrepare(testNodeA, conn, makeArgs("Prepare", args...))
asserts.AssertNotError(t, ret)
_, err := requestCommit(testNodeA, conn, txID, groupMap)
if err != nil {
t.Errorf("del failed %v", err)
return
}
ret = testNodeA.db.Exec(conn, toArgs("GET", "a")) // call db.Exec to skip key router
asserts.AssertNullBulk(t, ret)
requestRollback(testNodeA, conn, txID, groupMap)
ret = testNodeA.db.Exec(conn, toArgs("GET", "a"))
asserts.AssertBulkReply(t, ret, "a")
}

View File

@@ -1,189 +0,0 @@
package cluster
import (
"fmt"
"github.com/hdt3213/godis/database"
"github.com/hdt3213/godis/lib/logger"
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/connection"
"github.com/hdt3213/godis/redis/protocol"
"strconv"
"time"
)
func (cluster *Cluster) startAsSeed(listenAddr string) protocol.ErrorReply {
err := cluster.topology.StartAsSeed(listenAddr)
if err != nil {
return err
}
for i := 0; i < slotCount; i++ {
cluster.initSlot(uint32(i), slotStateHost)
}
return nil
}
// Join send `gcluster join` to node in cluster to join
func (cluster *Cluster) Join(seed string) protocol.ErrorReply {
err := cluster.topology.Join(seed)
if err != nil {
return nil
}
/* STEP3: asynchronous migrating slots */
go func() {
time.Sleep(time.Second) // let the cluster started
cluster.reBalance()
}()
return nil
}
var errConfigFileNotExist = protocol.MakeErrReply("cluster config file not exist")
// LoadConfig try to load cluster-config-file and re-join the cluster
func (cluster *Cluster) LoadConfig() protocol.ErrorReply {
err := cluster.topology.LoadConfigFile()
if err != nil {
return err
}
selfNodeId := cluster.topology.GetSelfNodeID()
selfNode := cluster.topology.GetNode(selfNodeId)
if selfNode == nil {
return protocol.MakeErrReply("ERR self node info not found")
}
for _, slot := range selfNode.Slots {
cluster.initSlot(slot.ID, slotStateHost)
}
return nil
}
func (cluster *Cluster) reBalance() {
nodes := cluster.topology.GetNodes()
var slotIDs []uint32
var slots []*Slot
reqDonateCmdLine := utils.ToCmdLine("gcluster", "request-donate", cluster.self)
for _, node := range nodes {
if node.ID == cluster.self {
continue
}
node := node
peerCli, err := cluster.clientFactory.GetPeerClient(node.Addr)
if err != nil {
logger.Errorf("get client of %s failed: %v", node.Addr, err)
continue
}
resp := peerCli.Send(reqDonateCmdLine)
payload, ok := resp.(*protocol.MultiBulkReply)
if !ok {
logger.Errorf("request donate to %s failed: %v", node.Addr, err)
continue
}
for _, bin := range payload.Args {
slotID64, err := strconv.ParseUint(string(bin), 10, 64)
if err != nil {
continue
}
slotID := uint32(slotID64)
slotIDs = append(slotIDs, slotID)
slots = append(slots, &Slot{
ID: slotID,
NodeID: node.ID,
})
// Raft cannot guarantee the simultaneity and order of submissions to the source and destination nodes
// In some cases the source node thinks the slot belongs to the destination node, and the destination node thinks the slot belongs to the source node
// To avoid it, the source node and the destination node must reach a consensus before propose to raft
cluster.setLocalSlotImporting(slotID, node.ID)
}
}
if len(slots) == 0 {
return
}
logger.Infof("received %d donated slots", len(slots))
// change route
err := cluster.topology.SetSlot(slotIDs, cluster.self)
if err != nil {
logger.Errorf("set slot route failed: %v", err)
return
}
slotChan := make(chan *Slot, len(slots))
for _, slot := range slots {
slotChan <- slot
}
close(slotChan)
for i := 0; i < 4; i++ {
i := i
go func() {
for slot := range slotChan {
logger.Info("start import slot ", slot.ID)
err := cluster.importSlot(slot)
if err != nil {
logger.Error(fmt.Sprintf("import slot %d error: %v", slot.ID, err))
// delete all imported keys in slot
cluster.cleanDroppedSlot(slot.ID)
// todo: recover route
return
}
logger.Infof("finish import slot: %d, about %d slots remains", slot.ID, len(slotChan))
}
logger.Infof("import worker %d exited", i)
}()
}
}
// importSlot do migrate slot into current node
// the pseudo `slot` parameter is used to store slotID and former host node
func (cluster *Cluster) importSlot(slot *Slot) error {
node := cluster.topology.GetNode(slot.NodeID)
/* get migrate stream */
migrateCmdLine := utils.ToCmdLine(
"gcluster", "migrate", strconv.Itoa(int(slot.ID)))
migrateStream, err := cluster.clientFactory.NewStream(node.Addr, migrateCmdLine)
if err != nil {
return err
}
defer migrateStream.Close()
fakeConn := connection.NewFakeConn()
slotLoop:
for proto := range migrateStream.Stream() {
if proto.Err != nil {
return fmt.Errorf("set slot %d error: %v", slot.ID, err)
}
switch reply := proto.Data.(type) {
case *protocol.MultiBulkReply:
// todo: handle exec error
keys, _ := database.GetRelatedKeys(reply.Args)
// assert len(keys) == 1
key := keys[0]
// key may be imported by Cluster.ensureKey or by former failed migrating try
if !cluster.isImportedKey(key) {
cluster.setImportedKey(key)
_ = cluster.db.Exec(fakeConn, reply.Args)
}
case *protocol.StatusReply:
if protocol.IsOKReply(reply) {
break slotLoop
} else {
// todo: return slot to former host node
msg := fmt.Sprintf("migrate slot %d error: %s", slot.ID, reply.Status)
logger.Errorf(msg)
return protocol.MakeErrReply(msg)
}
case protocol.ErrorReply:
// todo: return slot to former host node
msg := fmt.Sprintf("migrate slot %d error: %s", slot.ID, reply.Error())
logger.Errorf(msg)
return protocol.MakeErrReply(msg)
}
}
cluster.finishSlotImport(slot.ID)
// finish migration mode
peerCli, err := cluster.clientFactory.GetPeerClient(node.Addr)
if err != nil {
return err
}
defer cluster.clientFactory.ReturnPeerClient(node.Addr, peerCli)
peerCli.Send(utils.ToCmdLine("gcluster", "migrate-done", strconv.Itoa(int(slot.ID))))
return nil
}

View File

@@ -1,146 +0,0 @@
package cluster
import (
"fmt"
"github.com/hdt3213/godis/aof"
"github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/lib/logger"
"github.com/hdt3213/godis/redis/protocol"
"strconv"
"strings"
)
func init() {
registerCmd("gcluster", execGCluster)
}
func execGCluster(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
if len(args) < 2 {
return protocol.MakeArgNumErrReply("gcluster")
}
subCmd := strings.ToLower(string(args[1]))
switch subCmd {
case "set-slot":
// Command line: gcluster set-slot <slotID> <targetNodeID>
// Other node request current node to migrate a slot to it.
// Current node will set the slot as migrating state.
// After this function return, all requests of target slot will be routed to target node
return execGClusterSetSlot(cluster, c, args[2:])
case "migrate":
// Command line: gcluster migrate <slotId>
// Current node will dump the given slot to the node sending this request
// The given slot must in migrating state
return execGClusterMigrate(cluster, c, args[2:])
case "migrate-done":
// command line: gcluster migrate-done <slotId>
// The new node hosting given slot tells current node that migration has finished, remains data can be deleted
return execGClusterMigrateDone(cluster, c, args[2:])
case "request-donate":
// command line: gcluster donate <nodeID>
// picks some slots and gives them to the calling node for load balance
return execGClusterDonateSlot(cluster, c, args[2:])
}
return protocol.MakeErrReply(" ERR unknown gcluster sub command '" + subCmd + "'")
}
// execGClusterSetSlot set a current node hosted slot as migrating
// args is [slotID, newNodeId]
func execGClusterSetSlot(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
if len(args) != 2 {
return protocol.MakeArgNumErrReply("gcluster")
}
slotId0, err := strconv.Atoi(string(args[0]))
if err != nil || slotId0 >= slotCount {
return protocol.MakeErrReply("ERR value is not a valid slot id")
}
slotId := uint32(slotId0)
targetNodeID := string(args[1])
targetNode := cluster.topology.GetNode(targetNodeID)
if targetNode == nil {
return protocol.MakeErrReply("ERR node not found")
}
cluster.setSlotMovingOut(slotId, targetNodeID)
logger.Info(fmt.Sprintf("set slot %d to node %s", slotId, targetNodeID))
return protocol.MakeOkReply()
}
// execGClusterDonateSlot picks some slots and gives them to the calling node for load balance
// args is [callingNodeId]
func execGClusterDonateSlot(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
targetNodeID := string(args[0])
nodes := cluster.topology.GetNodes() // including the new node
avgSlot := slotCount / len(nodes)
cluster.slotMu.Lock()
defer cluster.slotMu.Unlock()
limit := len(cluster.slots) - avgSlot
if limit <= 0 {
return protocol.MakeEmptyMultiBulkReply()
}
result := make([][]byte, 0, limit)
// use the randomness of the for-each-in-map to randomly select slots
for slotID, slot := range cluster.slots {
if slot.state == slotStateHost {
slot.state = slotStateMovingOut
slot.newNodeID = targetNodeID
slotIDBin := []byte(strconv.FormatUint(uint64(slotID), 10))
result = append(result, slotIDBin)
if len(result) == limit {
break
}
}
}
return protocol.MakeMultiBulkReply(result)
}
// execGClusterMigrate Command line: gcluster migrate slotId
// Current node will dump data in the given slot to the node sending this request
// The given slot must in migrating state
func execGClusterMigrate(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
slotId0, err := strconv.Atoi(string(args[0]))
if err != nil || slotId0 >= slotCount {
return protocol.MakeErrReply("ERR value is not a valid slot id")
}
slotId := uint32(slotId0)
slot := cluster.getHostSlot(slotId)
if slot == nil || slot.state != slotStateMovingOut {
return protocol.MakeErrReply("ERR only dump migrating slot")
}
// migrating slot is immutable
logger.Info("start dump slot", slotId)
slot.keys.ForEach(func(key string) bool {
entity, ok := cluster.db.GetEntity(0, key)
if ok {
ret := aof.EntityToCmd(key, entity)
// todo: handle error and close connection
_, _ = c.Write(ret.ToBytes())
expire := cluster.db.GetExpiration(0, key)
if expire != nil {
ret = aof.MakeExpireCmd(key, *expire)
_, _ = c.Write(ret.ToBytes())
}
}
return true
})
logger.Info("finish dump slot ", slotId)
// send a ok reply to tell requesting node dump finished
return protocol.MakeOkReply()
}
// execGClusterMigrateDone command line: gcluster migrate-done <slotId>
func execGClusterMigrateDone(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
slotId0, err := strconv.Atoi(string(args[0]))
if err != nil || slotId0 >= slotCount {
return protocol.MakeErrReply("ERR value is not a valid slot id")
}
slotId := uint32(slotId0)
slot := cluster.getHostSlot(slotId)
if slot == nil || slot.state != slotStateMovingOut {
return protocol.MakeErrReply("ERR slot is not moving out")
}
cluster.cleanDroppedSlot(slotId)
cluster.slotMu.Lock()
delete(cluster.slots, slotId)
cluster.slotMu.Unlock()
return protocol.MakeOkReply()
}

View File

@@ -1,58 +0,0 @@
package cluster
import (
"github.com/hdt3213/godis/redis/protocol"
"hash/crc32"
"strings"
"time"
)
// Slot represents a hash slot, used in cluster internal messages
type Slot struct {
// ID is uint between 0 and 16383
ID uint32
// NodeID is id of the hosting node
// If the slot is migrating, NodeID is the id of the node importing this slot (target node)
NodeID string
// Flags stores more information of slot
Flags uint32
}
// getPartitionKey extract hashtag
func getPartitionKey(key string) string {
beg := strings.Index(key, "{")
if beg == -1 {
return key
}
end := strings.Index(key, "}")
if end == -1 || end == beg+1 {
return key
}
return key[beg+1 : end]
}
func getSlot(key string) uint32 {
partitionKey := getPartitionKey(key)
return crc32.ChecksumIEEE([]byte(partitionKey)) % uint32(slotCount)
}
// Node represents a node and its slots, used in cluster internal messages
type Node struct {
ID string
Addr string
Slots []*Slot // ascending order by slot id
Flags uint32
lastHeard time.Time
}
type topology interface {
GetSelfNodeID() string
GetNodes() []*Node // return a copy
GetNode(nodeID string) *Node
GetSlots() []*Slot
StartAsSeed(addr string) protocol.ErrorReply
SetSlot(slotIDs []uint32, newNodeID string) protocol.ErrorReply
LoadConfigFile() protocol.ErrorReply
Join(seed string) protocol.ErrorReply
Close() error
}

View File

@@ -1,152 +0,0 @@
package cluster
import (
"github.com/hdt3213/godis/config"
database2 "github.com/hdt3213/godis/database"
"github.com/hdt3213/godis/datastruct/dict"
"github.com/hdt3213/godis/lib/idgenerator"
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/connection"
"github.com/hdt3213/godis/redis/protocol/asserts"
"path"
"strconv"
"testing"
"time"
)
func makeTestRaft(addresses []string, timeoutFlags []bool, persistFilenames []string) ([]*Cluster, error) {
nodes := make([]*Cluster, len(addresses))
factory := &testClientFactory{
nodes: nodes,
timeoutFlags: timeoutFlags,
}
for i, addr := range addresses {
addr := addr
nodes[i] = &Cluster{
self: addr,
addr: addr,
db: database2.NewStandaloneServer(),
transactions: dict.MakeSimple(),
idGenerator: idgenerator.MakeGenerator(config.Properties.Self),
clientFactory: factory,
slots: make(map[uint32]*hostSlot),
}
topologyPersistFile := persistFilenames[i]
nodes[i].topology = newRaft(nodes[i], topologyPersistFile)
}
err := nodes[0].startAsSeed(addresses[0])
if err != nil {
return nil, err
}
err = nodes[1].Join(addresses[0])
if err != nil {
return nil, err
}
err = nodes[2].Join(addresses[0])
if err != nil {
return nil, err
}
return nodes, nil
}
func TestRaftStart(t *testing.T) {
addresses := []string{"127.0.0.1:6399", "127.0.0.1:7379", "127.0.0.1:7369"}
timeoutFlags := []bool{false, false, false}
persistFilenames := []string{"", "", ""}
nodes, err := makeTestRaft(addresses, timeoutFlags, persistFilenames)
if err != nil {
t.Error(err)
return
}
if nodes[0].asRaft().state != leader {
t.Error("expect leader")
return
}
if nodes[1].asRaft().state != follower {
t.Error("expect follower")
return
}
if nodes[2].asRaft().state != follower {
t.Error("expect follower")
return
}
size := 100
conn := connection.NewFakeConn()
for i := 0; i < size; i++ {
str := strconv.Itoa(i)
result := nodes[0].Exec(conn, utils.ToCmdLine("SET", str, str))
asserts.AssertNotError(t, result)
}
for i := 0; i < size; i++ {
str := strconv.Itoa(i)
result := nodes[0].Exec(conn, utils.ToCmdLine("Get", str))
asserts.AssertBulkReply(t, result, str)
}
for _, node := range nodes {
_ = node.asRaft().Close()
}
}
func TestRaftElection(t *testing.T) {
addresses := []string{"127.0.0.1:6399", "127.0.0.1:7379", "127.0.0.1:7369"}
timeoutFlags := []bool{false, false, false}
persistFilenames := []string{"", "", ""}
nodes, err := makeTestRaft(addresses, timeoutFlags, persistFilenames)
if err != nil {
t.Error(err)
return
}
nodes[0].asRaft().Close()
time.Sleep(3 * electionTimeoutMaxMs * time.Millisecond) // wait for leader timeout
//<-make(chan struct{}) // wait for leader timeout
for i := 0; i < 10; i++ {
leaderCount := 0
for _, node := range nodes {
if node.asRaft().closed {
continue
}
switch node.asRaft().state {
case leader:
leaderCount++
}
}
if leaderCount == 1 {
break
} else if leaderCount > 1 {
t.Errorf("get %d leaders, split brain", leaderCount)
break
}
time.Sleep(time.Second)
}
}
func TestRaftPersist(t *testing.T) {
addresses := []string{"127.0.0.1:6399", "127.0.0.1:7379", "127.0.0.1:7369"}
timeoutFlags := []bool{false, false, false}
persistFilenames := []string{
path.Join(config.Properties.Dir, "test6399.conf"),
path.Join(config.Properties.Dir, "test7379.conf"),
path.Join(config.Properties.Dir, "test7369.conf"),
}
nodes, err := makeTestRaft(addresses, timeoutFlags, persistFilenames)
if err != nil {
t.Error(err)
return
}
node1 := nodes[0].asRaft()
err = node1.persist()
if err != nil {
t.Error(err)
return
}
for _, node := range nodes {
_ = node.asRaft().Close()
}
err = node1.LoadConfigFile()
if err != nil {
t.Error(err)
return
}
}

View File

@@ -1,99 +0,0 @@
package cluster
import (
"github.com/hdt3213/godis/datastruct/set"
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/connection"
)
func (cluster *Cluster) isImportedKey(key string) bool {
slotId := getSlot(key)
cluster.slotMu.RLock()
slot := cluster.slots[slotId]
cluster.slotMu.RUnlock()
return slot.importedKeys.Has(key)
}
func (cluster *Cluster) setImportedKey(key string) {
slotId := getSlot(key)
cluster.slotMu.Lock()
slot := cluster.slots[slotId]
cluster.slotMu.Unlock()
slot.importedKeys.Add(key)
}
// initSlot init a slot when start as seed or import slot from other node
func (cluster *Cluster) initSlot(slotId uint32, state uint32) {
cluster.slotMu.Lock()
defer cluster.slotMu.Unlock()
cluster.slots[slotId] = &hostSlot{
importedKeys: set.Make(),
keys: set.Make(),
state: state,
}
}
func (cluster *Cluster) getHostSlot(slotId uint32) *hostSlot {
cluster.slotMu.RLock()
defer cluster.slotMu.RUnlock()
return cluster.slots[slotId]
}
func (cluster *Cluster) finishSlotImport(slotID uint32) {
cluster.slotMu.Lock()
defer cluster.slotMu.Unlock()
slot := cluster.slots[slotID]
slot.state = slotStateHost
slot.importedKeys = nil
slot.oldNodeID = ""
}
func (cluster *Cluster) setLocalSlotImporting(slotID uint32, oldNodeID string) {
cluster.slotMu.Lock()
defer cluster.slotMu.Unlock()
slot := cluster.slots[slotID]
if slot == nil {
slot = &hostSlot{
importedKeys: set.Make(),
keys: set.Make(),
}
cluster.slots[slotID] = slot
}
slot.state = slotStateImporting
slot.oldNodeID = oldNodeID
}
func (cluster *Cluster) setSlotMovingOut(slotID uint32, newNodeID string) {
cluster.slotMu.Lock()
defer cluster.slotMu.Unlock()
slot := cluster.slots[slotID]
if slot == nil {
slot = &hostSlot{
importedKeys: set.Make(),
keys: set.Make(),
}
cluster.slots[slotID] = slot
}
slot.state = slotStateMovingOut
slot.newNodeID = newNodeID
}
// cleanDroppedSlot deletes keys when slot has moved out or failed to import
func (cluster *Cluster) cleanDroppedSlot(slotID uint32) {
cluster.slotMu.RLock()
if cluster.slots[slotID] == nil {
cluster.slotMu.RUnlock()
return
}
keys := cluster.slots[slotID].importedKeys
cluster.slotMu.RUnlock()
c := connection.NewFakeConn()
go func() {
if keys != nil {
keys.ForEach(func(key string) bool {
cluster.db.Exec(c, utils.ToCmdLine("DEL", key))
return true
})
}
}()
}

View File

@@ -1,75 +0,0 @@
package cluster
import (
"github.com/hdt3213/godis/interface/redis"
)
func ping(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
return cluster.db.Exec(c, cmdLine)
}
func info(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
return cluster.db.Exec(c, cmdLine)
}
func randomkey(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
return cluster.db.Exec(c, cmdLine)
}
/*----- utils -------*/
func makeArgs(cmd string, args ...string) [][]byte {
result := make([][]byte, len(args)+1)
result[0] = []byte(cmd)
for i, arg := range args {
result[i+1] = []byte(arg)
}
return result
}
// return node -> writeKeys
func (cluster *Cluster) groupBy(keys []string) map[string][]string {
result := make(map[string][]string)
for _, key := range keys {
peer := cluster.pickNodeAddrByKey(key)
group, ok := result[peer]
if !ok {
group = make([]string, 0)
}
group = append(group, key)
result[peer] = group
}
return result
}
// pickNode returns the node id hosting the given slot.
// If the slot is migrating, return the node which is importing the slot
func (cluster *Cluster) pickNode(slotID uint32) *Node {
// check cluster.slot to avoid errors caused by inconsistent status on follower nodes during raft commits
// see cluster.reBalance()
hSlot := cluster.getHostSlot(slotID)
if hSlot != nil {
switch hSlot.state {
case slotStateMovingOut:
return cluster.topology.GetNode(hSlot.newNodeID)
case slotStateImporting, slotStateHost:
return cluster.topology.GetNode(cluster.self)
}
}
slot := cluster.topology.GetSlots()[int(slotID)]
node := cluster.topology.GetNode(slot.NodeID)
return node
}
func (cluster *Cluster) pickNodeAddrByKey(key string) string {
slotId := getSlot(key)
return cluster.pickNode(slotId).Addr
}
func modifyCmd(cmdLine CmdLine, newCmd string) CmdLine {
var cmdLine2 CmdLine
cmdLine2 = append(cmdLine2, cmdLine...)
cmdLine2[0] = []byte(newCmd)
return cmdLine2
}

View File

@@ -1,159 +0,0 @@
package cluster
import (
"errors"
"github.com/hdt3213/godis/config"
database2 "github.com/hdt3213/godis/database"
"github.com/hdt3213/godis/datastruct/dict"
"github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/lib/idgenerator"
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/connection"
"github.com/hdt3213/godis/redis/parser"
"github.com/hdt3213/godis/redis/protocol"
"math/rand"
"sync"
)
type testClientFactory struct {
nodes []*Cluster
timeoutFlags []bool
}
type testClient struct {
targetNode *Cluster
timeoutFlag *bool
conn redis.Connection
}
func (cli *testClient) Send(cmdLine [][]byte) redis.Reply {
if *cli.timeoutFlag {
return protocol.MakeErrReply("ERR timeout")
}
return cli.targetNode.Exec(cli.conn, cmdLine)
}
func (factory *testClientFactory) GetPeerClient(peerAddr string) (peerClient, error) {
for i, n := range factory.nodes {
if n.self == peerAddr {
cli := &testClient{
targetNode: n,
timeoutFlag: &factory.timeoutFlags[i],
conn: connection.NewFakeConn(),
}
if config.Properties.RequirePass != "" {
cli.Send(utils.ToCmdLine("AUTH", config.Properties.RequirePass))
}
return cli, nil
}
}
return nil, errors.New("peer not found")
}
type mockStream struct {
targetNode *Cluster
ch <-chan *parser.Payload
}
func (s *mockStream) Stream() <-chan *parser.Payload {
return s.ch
}
func (s *mockStream) Close() error {
return nil
}
func (factory *testClientFactory) NewStream(peerAddr string, cmdLine CmdLine) (peerStream, error) {
for _, n := range factory.nodes {
if n.self == peerAddr {
conn := connection.NewFakeConn()
if config.Properties.RequirePass != "" {
n.Exec(conn, utils.ToCmdLine("AUTH", config.Properties.RequirePass))
}
result := n.Exec(conn, cmdLine)
conn.Write(result.ToBytes())
ch := parser.ParseStream(conn)
return &mockStream{
targetNode: n,
ch: ch,
}, nil
}
}
return nil, errors.New("node not found")
}
func (factory *testClientFactory) ReturnPeerClient(peer string, peerClient peerClient) error {
return nil
}
func (factory *testClientFactory) Close() error {
return nil
}
// mockClusterNodes creates a fake cluster for test
// timeoutFlags should have the same length as addresses, set timeoutFlags[i] == true could simulate addresses[i] timeout
func mockClusterNodes(addresses []string, timeoutFlags []bool) []*Cluster {
nodes := make([]*Cluster, len(addresses))
// build fixedTopology
slots := make([]*Slot, slotCount)
nodeMap := make(map[string]*Node)
for _, addr := range addresses {
nodeMap[addr] = &Node{
ID: addr,
Addr: addr,
Slots: nil,
}
}
for i := range slots {
addr := addresses[i%len(addresses)]
slots[i] = &Slot{
ID: uint32(i),
NodeID: addr,
Flags: 0,
}
nodeMap[addr].Slots = append(nodeMap[addr].Slots, slots[i])
}
factory := &testClientFactory{
nodes: nodes,
timeoutFlags: timeoutFlags,
}
for i, addr := range addresses {
topo := &fixedTopology{
mu: sync.RWMutex{},
nodeMap: nodeMap,
slots: slots,
selfNodeID: addr,
}
nodes[i] = &Cluster{
self: addr,
db: database2.NewStandaloneServer(),
transactions: dict.MakeSimple(),
idGenerator: idgenerator.MakeGenerator(config.Properties.Self),
topology: topo,
clientFactory: factory,
}
}
return nodes
}
var addresses = []string{"127.0.0.1:6399", "127.0.0.1:7379"}
var timeoutFlags = []bool{false, false}
var testCluster = mockClusterNodes(addresses, timeoutFlags)
func toArgs(cmd ...string) [][]byte {
args := make([][]byte, len(cmd))
for i, s := range cmd {
args[i] = []byte(s)
}
return args
}
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
func RandString(n int) string {
b := make([]rune, n)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}

View File

@@ -44,7 +44,8 @@ type ServerProperties struct {
ClusterEnable bool `cfg:"cluster-enable"`
ClusterAsSeed bool `cfg:"cluster-as-seed"`
ClusterSeed string `cfg:"cluster-seed"`
RaftListenAddr string `cfg:"raft-listen-address"`
RaftAdvertiseAddr string `cfg:"raft-advertise-address"`
// config file path
CfPath string `cfg:"cf,omitempty"`
}
@@ -54,8 +55,11 @@ type ServerInfo struct {
}
func (p *ServerProperties) AnnounceAddress() string {
if p.AnnounceHost != "" {
return p.AnnounceHost + ":" + strconv.Itoa(p.Port)
}
return p.Bind + ":" + strconv.Itoa(p.Port)
}
// Properties holds global config properties
var Properties *ServerProperties