mirror of
https://github.com/HDT3213/godis.git
synced 2025-10-07 09:41:05 +08:00
support some commands in cluster mode
This commit is contained in:
@@ -33,14 +33,24 @@ func MakeCluster() *Cluster {
|
||||
peerPicker: consistenthash.New(replicas, nil),
|
||||
peers: make(map[string]*client.Client),
|
||||
}
|
||||
if config.Properties.Peers != nil && len(config.Properties.Peers) > 0 {
|
||||
cluster.peerPicker.Add(config.Properties.Peers...)
|
||||
if config.Properties.Peers != nil && len(config.Properties.Peers) > 0 && config.Properties.Self != "" {
|
||||
contains := make(map[string]bool)
|
||||
peers := make([]string, len(config.Properties.Peers)+1)[:]
|
||||
for _, peer := range config.Properties.Peers {
|
||||
if _, ok := contains[peer]; ok {
|
||||
continue
|
||||
}
|
||||
contains[peer] = true
|
||||
peers = append(peers, peer)
|
||||
}
|
||||
peers = append(peers, config.Properties.Self)
|
||||
cluster.peerPicker.Add(peers...)
|
||||
}
|
||||
return cluster
|
||||
}
|
||||
|
||||
// args contains all
|
||||
type CmdFunc func(cluster *Cluster, c redis.Client, args [][]byte) redis.Reply
|
||||
type CmdFunc func(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply
|
||||
|
||||
func (cluster *Cluster) Close() {
|
||||
cluster.db.Close()
|
||||
@@ -48,7 +58,7 @@ func (cluster *Cluster) Close() {
|
||||
|
||||
var router = MakeRouter()
|
||||
|
||||
func (cluster *Cluster) Exec(c redis.Client, args [][]byte) (result redis.Reply) {
|
||||
func (cluster *Cluster) Exec(c redis.Connection, args [][]byte) (result redis.Reply) {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
logger.Warn(fmt.Sprintf("error occurs: %v\n%s", err, string(debug.Stack())))
|
||||
@@ -59,34 +69,12 @@ func (cluster *Cluster) Exec(c redis.Client, args [][]byte) (result redis.Reply)
|
||||
cmd := strings.ToLower(string(args[0]))
|
||||
cmdFunc, ok := router[cmd]
|
||||
if !ok {
|
||||
return reply.MakeErrReply("ERR unknown command '" + cmd + "'")
|
||||
return reply.MakeErrReply("ERR unknown command '" + cmd + "', or not supported in cluster mode")
|
||||
}
|
||||
result = cmdFunc(cluster, c, args)
|
||||
return
|
||||
}
|
||||
|
||||
// relay command to peer
|
||||
func (cluster *Cluster) Relay(key string, c redis.Client, args [][]byte) redis.Reply {
|
||||
peer := cluster.peerPicker.Get(key)
|
||||
if peer == cluster.self {
|
||||
// to self db
|
||||
return cluster.db.Exec(c, args)
|
||||
} else {
|
||||
peerClient, ok := cluster.peers[peer]
|
||||
// lazy init
|
||||
if !ok {
|
||||
var err error
|
||||
peerClient, err = client.MakeClient(peer)
|
||||
if err != nil {
|
||||
return reply.MakeErrReply(err.Error())
|
||||
}
|
||||
peerClient.Start()
|
||||
cluster.peers[peer] = peerClient
|
||||
}
|
||||
return peerClient.Send(args)
|
||||
}
|
||||
}
|
||||
|
||||
func (cluster *Cluster) AfterClientClose(c redis.Client) {
|
||||
func (cluster *Cluster) AfterClientClose(c redis.Connection) {
|
||||
|
||||
}
|
||||
|
221
src/cluster/implements.go
Normal file
221
src/cluster/implements.go
Normal file
@@ -0,0 +1,221 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/HDT3213/godis/src/interface/redis"
|
||||
"github.com/HDT3213/godis/src/redis/client"
|
||||
"github.com/HDT3213/godis/src/redis/reply"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func makeArgs(cmd string, args ...string) [][]byte {
|
||||
result := make([][]byte, len(args)+1)
|
||||
result[0] = []byte(cmd)
|
||||
for i, arg := range args {
|
||||
result[i+1] = []byte(arg)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func (cluster *Cluster) getPeerClient(peer string) (*client.Client, error) {
|
||||
peerClient, ok := cluster.peers[peer]
|
||||
// lazy init
|
||||
if !ok {
|
||||
var err error
|
||||
peerClient, err = client.MakeClient(peer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
peerClient.Start()
|
||||
cluster.peers[peer] = peerClient
|
||||
}
|
||||
return peerClient, nil
|
||||
}
|
||||
|
||||
// return peer -> keys
|
||||
func (cluster *Cluster) groupBy(keys []string) map[string][]string {
|
||||
result := make(map[string][]string)
|
||||
for _, key := range keys {
|
||||
peer := cluster.peerPicker.Get(key)
|
||||
group, ok := result[peer]
|
||||
if !ok {
|
||||
group = make([]string, 0)
|
||||
}
|
||||
group = append(group, key)
|
||||
result[peer] = group
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// relay command to peer
|
||||
func (cluster *Cluster) Relay(peer string, c redis.Connection, args [][]byte) redis.Reply {
|
||||
if peer == cluster.self {
|
||||
// to self db
|
||||
return cluster.db.Exec(c, args)
|
||||
} else {
|
||||
peerClient, err := cluster.getPeerClient(peer)
|
||||
if err != nil {
|
||||
return reply.MakeErrReply(err.Error())
|
||||
}
|
||||
return peerClient.Send(args)
|
||||
}
|
||||
}
|
||||
|
||||
func defaultFunc(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
key := string(args[1])
|
||||
peer := cluster.peerPicker.Get(key)
|
||||
return cluster.Relay(peer, c, args)
|
||||
}
|
||||
|
||||
func Ping(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
if len(args) == 1 {
|
||||
return &reply.PongReply{}
|
||||
} else if len(args) == 2 {
|
||||
return reply.MakeStatusReply("\"" + string(args[1]) + "\"")
|
||||
} else {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'ping' command")
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: support multiplex slots
|
||||
func Rename(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
if len(args) != 3 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'rename' command")
|
||||
}
|
||||
src := string(args[1])
|
||||
dest := string(args[2])
|
||||
|
||||
srcPeer := cluster.peerPicker.Get(src)
|
||||
destPeer := cluster.peerPicker.Get(dest)
|
||||
|
||||
if srcPeer != destPeer {
|
||||
return reply.MakeErrReply("ERR rename must within one slot in cluster mode")
|
||||
}
|
||||
return cluster.Relay(srcPeer, c, args)
|
||||
}
|
||||
|
||||
func RenameNx(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
if len(args) != 3 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'renamenx' command")
|
||||
}
|
||||
src := string(args[1])
|
||||
dest := string(args[2])
|
||||
|
||||
srcPeer := cluster.peerPicker.Get(src)
|
||||
destPeer := cluster.peerPicker.Get(dest)
|
||||
|
||||
if srcPeer != destPeer {
|
||||
return reply.MakeErrReply("ERR rename must within one slot in cluster mode")
|
||||
}
|
||||
return cluster.Relay(srcPeer, c, args)
|
||||
}
|
||||
|
||||
func MSetNX(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
argCount := len(args) - 1
|
||||
if argCount%2 != 0 || argCount < 1 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'mset' command")
|
||||
}
|
||||
var peer string
|
||||
size := argCount / 2
|
||||
for i := 0; i < size; i++ {
|
||||
key := string(args[2*i])
|
||||
currentPeer := cluster.peerPicker.Get(key)
|
||||
if peer == "" {
|
||||
peer = currentPeer
|
||||
} else {
|
||||
if peer != currentPeer {
|
||||
return reply.MakeErrReply("ERR msetnx must within one slot in cluster mode")
|
||||
}
|
||||
}
|
||||
}
|
||||
return cluster.Relay(peer, c, args)
|
||||
}
|
||||
|
||||
// TODO: avoid part failure
|
||||
func Del(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
if len(args) < 2 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'del' command")
|
||||
}
|
||||
keys := make([]string, len(args)-1)
|
||||
for i := 1; i < len(args); i++ {
|
||||
keys[i-1] = string(args[i])
|
||||
}
|
||||
failedKeys := make([]string, 0)
|
||||
groupMap := cluster.groupBy(keys)
|
||||
for peer, group := range groupMap {
|
||||
resp := cluster.Relay(peer, c, makeArgs("DEL", group...))
|
||||
if reply.IsErrorReply(resp) {
|
||||
failedKeys = append(failedKeys, group...)
|
||||
}
|
||||
}
|
||||
if len(failedKeys) > 0 {
|
||||
return reply.MakeErrReply("ERR part failure: " + strings.Join(failedKeys, ","))
|
||||
}
|
||||
return reply.MakeIntReply(int64(len(keys)))
|
||||
}
|
||||
|
||||
func MGet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
if len(args) < 2 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'del' command")
|
||||
}
|
||||
keys := make([]string, len(args)-1)
|
||||
for i := 1; i < len(args); i++ {
|
||||
keys[i-1] = string(args[i])
|
||||
}
|
||||
|
||||
resultMap := make(map[string][]byte)
|
||||
groupMap := cluster.groupBy(keys)
|
||||
for peer, group := range groupMap {
|
||||
resp := cluster.Relay(peer, c, makeArgs("MGET", group...))
|
||||
if reply.IsErrorReply(resp) {
|
||||
errReply := resp.(reply.ErrorReply)
|
||||
return reply.MakeErrReply(fmt.Sprintf("ERR during get %s occurs: %v", group[0], errReply.Error()))
|
||||
}
|
||||
arrReply, _ := resp.(*reply.MultiBulkReply)
|
||||
for i, v := range arrReply.Args {
|
||||
key := group[i]
|
||||
resultMap[key] = v
|
||||
}
|
||||
}
|
||||
result := make([][]byte, len(keys))
|
||||
for i, k := range keys {
|
||||
result[i] = resultMap[k]
|
||||
}
|
||||
return reply.MakeMultiBulkReply(result)
|
||||
}
|
||||
|
||||
func MSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
argCount := len(args) - 1
|
||||
if argCount%2 != 0 || argCount < 1 {
|
||||
return reply.MakeErrReply("ERR wrong number of arguments for 'mset' command")
|
||||
}
|
||||
|
||||
size := argCount / 2
|
||||
keys := make([]string, size)
|
||||
valueMap := make(map[string]string)
|
||||
for i := 0; i < size; i++ {
|
||||
keys[i] = string(args[2*i])
|
||||
valueMap[keys[i]] = string(args[2*i+1])
|
||||
}
|
||||
|
||||
failedKeys := make([]string, 0)
|
||||
groupMap := cluster.groupBy(keys)
|
||||
for peer, groupKeys := range groupMap {
|
||||
peerArgs := make([][]byte, 2*len(groupKeys)+1)
|
||||
peerArgs[0] = []byte("MSET")
|
||||
for i, k := range groupKeys {
|
||||
peerArgs[2*i+1] = []byte(k)
|
||||
value := valueMap[k]
|
||||
peerArgs[2*i+2] = []byte(value)
|
||||
}
|
||||
resp := cluster.Relay(peer, c, peerArgs)
|
||||
if reply.IsErrorReply(resp) {
|
||||
failedKeys = append(failedKeys, groupKeys...)
|
||||
}
|
||||
}
|
||||
if len(failedKeys) > 0 {
|
||||
return reply.MakeErrReply("ERR part failure: " + strings.Join(failedKeys, ","))
|
||||
}
|
||||
return &reply.OkReply{}
|
||||
|
||||
}
|
@@ -1,17 +1,10 @@
|
||||
package cluster
|
||||
|
||||
import "github.com/HDT3213/godis/src/interface/redis"
|
||||
|
||||
func defaultFunc(cluster *Cluster, c redis.Client, args [][]byte) redis.Reply {
|
||||
key := string(args[1])
|
||||
return cluster.Relay(key, c, args)
|
||||
}
|
||||
|
||||
func MakeRouter() map[string]CmdFunc {
|
||||
routerMap := make(map[string]CmdFunc)
|
||||
//routerMap["ping"] = defaultFunc
|
||||
routerMap["ping"] = Ping
|
||||
|
||||
//routerMap["del"] = Del
|
||||
routerMap["del"] = Del
|
||||
routerMap["expire"] = defaultFunc
|
||||
routerMap["expireat"] = defaultFunc
|
||||
routerMap["pexpire"] = defaultFunc
|
||||
@@ -21,16 +14,16 @@ func MakeRouter() map[string]CmdFunc {
|
||||
routerMap["persist"] = defaultFunc
|
||||
routerMap["exists"] = defaultFunc
|
||||
routerMap["type"] = defaultFunc
|
||||
//routerMap["rename"] = Rename
|
||||
//routerMap["renamenx"] = RenameNx
|
||||
routerMap["rename"] = Rename
|
||||
routerMap["renamenx"] = RenameNx
|
||||
|
||||
routerMap["set"] = defaultFunc
|
||||
routerMap["setnx"] = defaultFunc
|
||||
routerMap["setex"] = defaultFunc
|
||||
routerMap["psetex"] = defaultFunc
|
||||
//routerMap["mset"] = MSet
|
||||
//routerMap["mget"] = MGet
|
||||
//routerMap["msetnx"] = MSetNX
|
||||
routerMap["mset"] = MSet
|
||||
routerMap["mget"] = MGet
|
||||
routerMap["msetnx"] = MSetNX
|
||||
routerMap["get"] = defaultFunc
|
||||
routerMap["getset"] = defaultFunc
|
||||
routerMap["incr"] = defaultFunc
|
||||
|
@@ -1 +0,0 @@
|
||||
package cluster
|
@@ -100,7 +100,7 @@ func (db *DB) Close() {
|
||||
}
|
||||
}
|
||||
|
||||
func (db *DB) Exec(c redis.Client, args [][]byte) (result redis.Reply) {
|
||||
func (db *DB) Exec(c redis.Connection, args [][]byte) (result redis.Reply) {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
logger.Warn(fmt.Sprintf("error occurs: %v\n%s", err, string(debug.Stack())))
|
||||
@@ -292,6 +292,6 @@ func (db *DB) TimerTask() {
|
||||
|
||||
/* ---- Subscribe Functions ---- */
|
||||
|
||||
func (db *DB) AfterClientClose(c redis.Client) {
|
||||
func (db *DB) AfterClientClose(c redis.Connection) {
|
||||
pubsub.UnsubscribeAll(db.hub, c)
|
||||
}
|
||||
|
@@ -3,7 +3,7 @@ package db
|
||||
import "github.com/HDT3213/godis/src/interface/redis"
|
||||
|
||||
type DB interface {
|
||||
Exec(client redis.Client, args [][]byte)redis.Reply
|
||||
AfterClientClose(c redis.Client)
|
||||
Exec(client redis.Connection, args [][]byte) redis.Reply
|
||||
AfterClientClose(c redis.Connection)
|
||||
Close()
|
||||
}
|
||||
|
@@ -1,6 +1,6 @@
|
||||
package redis
|
||||
|
||||
type Client interface {
|
||||
type Connection interface {
|
||||
Write([]byte) error
|
||||
|
||||
// client should keep its subscribing channels
|
||||
|
@@ -4,6 +4,7 @@ import (
|
||||
"hash/crc32"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type HashFunc func(data []byte) uint32
|
||||
@@ -33,6 +34,9 @@ func (m *Map) IsEmpty() bool {
|
||||
|
||||
func (m *Map) Add(keys ...string) {
|
||||
for _, key := range keys {
|
||||
if key == "" {
|
||||
continue
|
||||
}
|
||||
for i := 0; i < m.replicas; i++ {
|
||||
hash := int(m.hashFunc([]byte(strconv.Itoa(i) + key)))
|
||||
m.keys = append(m.keys, hash)
|
||||
@@ -42,13 +46,27 @@ func (m *Map) Add(keys ...string) {
|
||||
sort.Ints(m.keys)
|
||||
}
|
||||
|
||||
// support hash tag
|
||||
func getPartitionKey(key string) string {
|
||||
beg := strings.Index(key, "{")
|
||||
if beg == -1 {
|
||||
return key
|
||||
}
|
||||
end := strings.Index(key, "}")
|
||||
if end == -1 || end == beg+1 {
|
||||
return key
|
||||
}
|
||||
return key[beg+1 : end]
|
||||
}
|
||||
|
||||
// Get gets the closest item in the hash to the provided key.
|
||||
func (m *Map) Get(key string) string {
|
||||
if m.IsEmpty() {
|
||||
return ""
|
||||
}
|
||||
|
||||
hash := int(m.hashFunc([]byte(key)))
|
||||
partitionKey := getPartitionKey(key)
|
||||
hash := int(m.hashFunc([]byte(partitionKey)))
|
||||
|
||||
// Binary search for appropriate replica.
|
||||
idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash })
|
||||
|
@@ -24,7 +24,7 @@ func makeMsg(t string, channel string, code int64) []byte {
|
||||
* invoker should lock channel
|
||||
* return: is new subscribed
|
||||
*/
|
||||
func subscribe0(hub *Hub, channel string, client redis.Client) bool {
|
||||
func subscribe0(hub *Hub, channel string, client redis.Connection) bool {
|
||||
client.SubsChannel(channel)
|
||||
|
||||
// add into hub.subs
|
||||
@@ -47,7 +47,7 @@ func subscribe0(hub *Hub, channel string, client redis.Client) bool {
|
||||
* invoker should lock channel
|
||||
* return: is actually un-subscribe
|
||||
*/
|
||||
func unsubscribe0(hub *Hub, channel string, client redis.Client) bool {
|
||||
func unsubscribe0(hub *Hub, channel string, client redis.Connection) bool {
|
||||
client.UnSubsChannel(channel)
|
||||
|
||||
// remove from hub.subs
|
||||
@@ -65,7 +65,7 @@ func unsubscribe0(hub *Hub, channel string, client redis.Client) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func Subscribe(hub *Hub, c redis.Client, args [][]byte) redis.Reply {
|
||||
func Subscribe(hub *Hub, c redis.Connection, args [][]byte) redis.Reply {
|
||||
channels := make([]string, len(args))
|
||||
for i, b := range args {
|
||||
channels[i] = string(b)
|
||||
@@ -82,7 +82,7 @@ func Subscribe(hub *Hub, c redis.Client, args [][]byte) redis.Reply {
|
||||
return &reply.NoReply{}
|
||||
}
|
||||
|
||||
func UnsubscribeAll(hub *Hub, c redis.Client) {
|
||||
func UnsubscribeAll(hub *Hub, c redis.Connection) {
|
||||
channels := c.GetChannels()
|
||||
|
||||
hub.subsLocker.Locks(channels...)
|
||||
@@ -94,7 +94,7 @@ func UnsubscribeAll(hub *Hub, c redis.Client) {
|
||||
|
||||
}
|
||||
|
||||
func UnSubscribe(db *Hub, c redis.Client, args [][]byte) redis.Reply {
|
||||
func UnSubscribe(db *Hub, c redis.Connection, args [][]byte) redis.Reply {
|
||||
var channels []string
|
||||
if len(args) > 0 {
|
||||
channels = make([]string, len(args))
|
||||
@@ -137,7 +137,7 @@ func Publish(hub *Hub, args [][]byte) redis.Reply {
|
||||
}
|
||||
subscribers, _ := raw.(*list.LinkedList)
|
||||
subscribers.ForEach(func(i int, c interface{}) bool {
|
||||
client, _ := c.(redis.Client)
|
||||
client, _ := c.(redis.Connection)
|
||||
replyArgs := make([][]byte, 3)
|
||||
replyArgs[0] = messageBytes
|
||||
replyArgs[1] = []byte(channel)
|
||||
|
@@ -1,6 +1,7 @@
|
||||
package reply
|
||||
|
||||
import (
|
||||
"github.com/HDT3213/godis/src/interface/redis"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
@@ -103,6 +104,10 @@ func MakeErrReply(status string) *StandardErrReply {
|
||||
}
|
||||
}
|
||||
|
||||
func IsErrorReply(reply redis.Reply) bool {
|
||||
return reply.ToBytes()[0] == '-'
|
||||
}
|
||||
|
||||
func (r *StandardErrReply) ToBytes() []byte {
|
||||
return []byte("-" + r.Status + "\r\n")
|
||||
}
|
||||
|
Reference in New Issue
Block a user