diff --git a/src/cluster/cluster.go b/src/cluster/cluster.go index 2b63246..a9b34b0 100644 --- a/src/cluster/cluster.go +++ b/src/cluster/cluster.go @@ -33,14 +33,24 @@ func MakeCluster() *Cluster { peerPicker: consistenthash.New(replicas, nil), peers: make(map[string]*client.Client), } - if config.Properties.Peers != nil && len(config.Properties.Peers) > 0 { - cluster.peerPicker.Add(config.Properties.Peers...) + if config.Properties.Peers != nil && len(config.Properties.Peers) > 0 && config.Properties.Self != "" { + contains := make(map[string]bool) + peers := make([]string, len(config.Properties.Peers)+1)[:] + for _, peer := range config.Properties.Peers { + if _, ok := contains[peer]; ok { + continue + } + contains[peer] = true + peers = append(peers, peer) + } + peers = append(peers, config.Properties.Self) + cluster.peerPicker.Add(peers...) } return cluster } // args contains all -type CmdFunc func(cluster *Cluster, c redis.Client, args [][]byte) redis.Reply +type CmdFunc func(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply func (cluster *Cluster) Close() { cluster.db.Close() @@ -48,7 +58,7 @@ func (cluster *Cluster) Close() { var router = MakeRouter() -func (cluster *Cluster) Exec(c redis.Client, args [][]byte) (result redis.Reply) { +func (cluster *Cluster) Exec(c redis.Connection, args [][]byte) (result redis.Reply) { defer func() { if err := recover(); err != nil { logger.Warn(fmt.Sprintf("error occurs: %v\n%s", err, string(debug.Stack()))) @@ -59,34 +69,12 @@ func (cluster *Cluster) Exec(c redis.Client, args [][]byte) (result redis.Reply) cmd := strings.ToLower(string(args[0])) cmdFunc, ok := router[cmd] if !ok { - return reply.MakeErrReply("ERR unknown command '" + cmd + "'") + return reply.MakeErrReply("ERR unknown command '" + cmd + "', or not supported in cluster mode") } result = cmdFunc(cluster, c, args) return } -// relay command to peer -func (cluster *Cluster) Relay(key string, c redis.Client, args [][]byte) redis.Reply { - peer := cluster.peerPicker.Get(key) - if peer == cluster.self { - // to self db - return cluster.db.Exec(c, args) - } else { - peerClient, ok := cluster.peers[peer] - // lazy init - if !ok { - var err error - peerClient, err = client.MakeClient(peer) - if err != nil { - return reply.MakeErrReply(err.Error()) - } - peerClient.Start() - cluster.peers[peer] = peerClient - } - return peerClient.Send(args) - } -} - -func (cluster *Cluster) AfterClientClose(c redis.Client) { +func (cluster *Cluster) AfterClientClose(c redis.Connection) { } diff --git a/src/cluster/implements.go b/src/cluster/implements.go new file mode 100644 index 0000000..fba992e --- /dev/null +++ b/src/cluster/implements.go @@ -0,0 +1,221 @@ +package cluster + +import ( + "fmt" + "github.com/HDT3213/godis/src/interface/redis" + "github.com/HDT3213/godis/src/redis/client" + "github.com/HDT3213/godis/src/redis/reply" + "strings" +) + +func makeArgs(cmd string, args ...string) [][]byte { + result := make([][]byte, len(args)+1) + result[0] = []byte(cmd) + for i, arg := range args { + result[i+1] = []byte(arg) + } + return result +} + +func (cluster *Cluster) getPeerClient(peer string) (*client.Client, error) { + peerClient, ok := cluster.peers[peer] + // lazy init + if !ok { + var err error + peerClient, err = client.MakeClient(peer) + if err != nil { + return nil, err + } + peerClient.Start() + cluster.peers[peer] = peerClient + } + return peerClient, nil +} + +// return peer -> keys +func (cluster *Cluster) groupBy(keys []string) map[string][]string { + result := make(map[string][]string) + for _, key := range keys { + peer := cluster.peerPicker.Get(key) + group, ok := result[peer] + if !ok { + group = make([]string, 0) + } + group = append(group, key) + result[peer] = group + } + return result +} + +// relay command to peer +func (cluster *Cluster) Relay(peer string, c redis.Connection, args [][]byte) redis.Reply { + if peer == cluster.self { + // to self db + return cluster.db.Exec(c, args) + } else { + peerClient, err := cluster.getPeerClient(peer) + if err != nil { + return reply.MakeErrReply(err.Error()) + } + return peerClient.Send(args) + } +} + +func defaultFunc(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { + key := string(args[1]) + peer := cluster.peerPicker.Get(key) + return cluster.Relay(peer, c, args) +} + +func Ping(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { + if len(args) == 1 { + return &reply.PongReply{} + } else if len(args) == 2 { + return reply.MakeStatusReply("\"" + string(args[1]) + "\"") + } else { + return reply.MakeErrReply("ERR wrong number of arguments for 'ping' command") + } +} + +// TODO: support multiplex slots +func Rename(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { + if len(args) != 3 { + return reply.MakeErrReply("ERR wrong number of arguments for 'rename' command") + } + src := string(args[1]) + dest := string(args[2]) + + srcPeer := cluster.peerPicker.Get(src) + destPeer := cluster.peerPicker.Get(dest) + + if srcPeer != destPeer { + return reply.MakeErrReply("ERR rename must within one slot in cluster mode") + } + return cluster.Relay(srcPeer, c, args) +} + +func RenameNx(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { + if len(args) != 3 { + return reply.MakeErrReply("ERR wrong number of arguments for 'renamenx' command") + } + src := string(args[1]) + dest := string(args[2]) + + srcPeer := cluster.peerPicker.Get(src) + destPeer := cluster.peerPicker.Get(dest) + + if srcPeer != destPeer { + return reply.MakeErrReply("ERR rename must within one slot in cluster mode") + } + return cluster.Relay(srcPeer, c, args) +} + +func MSetNX(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { + argCount := len(args) - 1 + if argCount%2 != 0 || argCount < 1 { + return reply.MakeErrReply("ERR wrong number of arguments for 'mset' command") + } + var peer string + size := argCount / 2 + for i := 0; i < size; i++ { + key := string(args[2*i]) + currentPeer := cluster.peerPicker.Get(key) + if peer == "" { + peer = currentPeer + } else { + if peer != currentPeer { + return reply.MakeErrReply("ERR msetnx must within one slot in cluster mode") + } + } + } + return cluster.Relay(peer, c, args) +} + +// TODO: avoid part failure +func Del(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { + if len(args) < 2 { + return reply.MakeErrReply("ERR wrong number of arguments for 'del' command") + } + keys := make([]string, len(args)-1) + for i := 1; i < len(args); i++ { + keys[i-1] = string(args[i]) + } + failedKeys := make([]string, 0) + groupMap := cluster.groupBy(keys) + for peer, group := range groupMap { + resp := cluster.Relay(peer, c, makeArgs("DEL", group...)) + if reply.IsErrorReply(resp) { + failedKeys = append(failedKeys, group...) + } + } + if len(failedKeys) > 0 { + return reply.MakeErrReply("ERR part failure: " + strings.Join(failedKeys, ",")) + } + return reply.MakeIntReply(int64(len(keys))) +} + +func MGet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { + if len(args) < 2 { + return reply.MakeErrReply("ERR wrong number of arguments for 'del' command") + } + keys := make([]string, len(args)-1) + for i := 1; i < len(args); i++ { + keys[i-1] = string(args[i]) + } + + resultMap := make(map[string][]byte) + groupMap := cluster.groupBy(keys) + for peer, group := range groupMap { + resp := cluster.Relay(peer, c, makeArgs("MGET", group...)) + if reply.IsErrorReply(resp) { + errReply := resp.(reply.ErrorReply) + return reply.MakeErrReply(fmt.Sprintf("ERR during get %s occurs: %v", group[0], errReply.Error())) + } + arrReply, _ := resp.(*reply.MultiBulkReply) + for i, v := range arrReply.Args { + key := group[i] + resultMap[key] = v + } + } + result := make([][]byte, len(keys)) + for i, k := range keys { + result[i] = resultMap[k] + } + return reply.MakeMultiBulkReply(result) +} + +func MSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { + argCount := len(args) - 1 + if argCount%2 != 0 || argCount < 1 { + return reply.MakeErrReply("ERR wrong number of arguments for 'mset' command") + } + + size := argCount / 2 + keys := make([]string, size) + valueMap := make(map[string]string) + for i := 0; i < size; i++ { + keys[i] = string(args[2*i]) + valueMap[keys[i]] = string(args[2*i+1]) + } + + failedKeys := make([]string, 0) + groupMap := cluster.groupBy(keys) + for peer, groupKeys := range groupMap { + peerArgs := make([][]byte, 2*len(groupKeys)+1) + peerArgs[0] = []byte("MSET") + for i, k := range groupKeys { + peerArgs[2*i+1] = []byte(k) + value := valueMap[k] + peerArgs[2*i+2] = []byte(value) + } + resp := cluster.Relay(peer, c, peerArgs) + if reply.IsErrorReply(resp) { + failedKeys = append(failedKeys, groupKeys...) + } + } + if len(failedKeys) > 0 { + return reply.MakeErrReply("ERR part failure: " + strings.Join(failedKeys, ",")) + } + return &reply.OkReply{} + +} diff --git a/src/cluster/router.go b/src/cluster/router.go index 52d4c89..7f4d6a3 100644 --- a/src/cluster/router.go +++ b/src/cluster/router.go @@ -1,17 +1,10 @@ package cluster -import "github.com/HDT3213/godis/src/interface/redis" - -func defaultFunc(cluster *Cluster, c redis.Client, args [][]byte) redis.Reply { - key := string(args[1]) - return cluster.Relay(key, c, args) -} - func MakeRouter() map[string]CmdFunc { routerMap := make(map[string]CmdFunc) - //routerMap["ping"] = defaultFunc + routerMap["ping"] = Ping - //routerMap["del"] = Del + routerMap["del"] = Del routerMap["expire"] = defaultFunc routerMap["expireat"] = defaultFunc routerMap["pexpire"] = defaultFunc @@ -21,16 +14,16 @@ func MakeRouter() map[string]CmdFunc { routerMap["persist"] = defaultFunc routerMap["exists"] = defaultFunc routerMap["type"] = defaultFunc - //routerMap["rename"] = Rename - //routerMap["renamenx"] = RenameNx + routerMap["rename"] = Rename + routerMap["renamenx"] = RenameNx routerMap["set"] = defaultFunc routerMap["setnx"] = defaultFunc routerMap["setex"] = defaultFunc routerMap["psetex"] = defaultFunc - //routerMap["mset"] = MSet - //routerMap["mget"] = MGet - //routerMap["msetnx"] = MSetNX + routerMap["mset"] = MSet + routerMap["mget"] = MGet + routerMap["msetnx"] = MSetNX routerMap["get"] = defaultFunc routerMap["getset"] = defaultFunc routerMap["incr"] = defaultFunc diff --git a/src/cluster/string.go b/src/cluster/string.go deleted file mode 100644 index 916b1b5..0000000 --- a/src/cluster/string.go +++ /dev/null @@ -1 +0,0 @@ -package cluster diff --git a/src/db/db.go b/src/db/db.go index ce07790..b84a230 100644 --- a/src/db/db.go +++ b/src/db/db.go @@ -100,7 +100,7 @@ func (db *DB) Close() { } } -func (db *DB) Exec(c redis.Client, args [][]byte) (result redis.Reply) { +func (db *DB) Exec(c redis.Connection, args [][]byte) (result redis.Reply) { defer func() { if err := recover(); err != nil { logger.Warn(fmt.Sprintf("error occurs: %v\n%s", err, string(debug.Stack()))) @@ -292,6 +292,6 @@ func (db *DB) TimerTask() { /* ---- Subscribe Functions ---- */ -func (db *DB) AfterClientClose(c redis.Client) { +func (db *DB) AfterClientClose(c redis.Connection) { pubsub.UnsubscribeAll(db.hub, c) } diff --git a/src/interface/db/db.go b/src/interface/db/db.go index 6691cfe..ed6853c 100644 --- a/src/interface/db/db.go +++ b/src/interface/db/db.go @@ -3,7 +3,7 @@ package db import "github.com/HDT3213/godis/src/interface/redis" type DB interface { - Exec(client redis.Client, args [][]byte)redis.Reply - AfterClientClose(c redis.Client) + Exec(client redis.Connection, args [][]byte) redis.Reply + AfterClientClose(c redis.Connection) Close() } diff --git a/src/interface/redis/client.go b/src/interface/redis/client.go index c3621bc..4441516 100644 --- a/src/interface/redis/client.go +++ b/src/interface/redis/client.go @@ -1,6 +1,6 @@ package redis -type Client interface { +type Connection interface { Write([]byte) error // client should keep its subscribing channels diff --git a/src/lib/consistenthash/consistenthash.go b/src/lib/consistenthash/consistenthash.go index 090546b..fc9c345 100644 --- a/src/lib/consistenthash/consistenthash.go +++ b/src/lib/consistenthash/consistenthash.go @@ -4,6 +4,7 @@ import ( "hash/crc32" "sort" "strconv" + "strings" ) type HashFunc func(data []byte) uint32 @@ -33,6 +34,9 @@ func (m *Map) IsEmpty() bool { func (m *Map) Add(keys ...string) { for _, key := range keys { + if key == "" { + continue + } for i := 0; i < m.replicas; i++ { hash := int(m.hashFunc([]byte(strconv.Itoa(i) + key))) m.keys = append(m.keys, hash) @@ -42,13 +46,27 @@ func (m *Map) Add(keys ...string) { sort.Ints(m.keys) } +// support hash tag +func getPartitionKey(key string) string { + beg := strings.Index(key, "{") + if beg == -1 { + return key + } + end := strings.Index(key, "}") + if end == -1 || end == beg+1 { + return key + } + return key[beg+1 : end] +} + // Get gets the closest item in the hash to the provided key. func (m *Map) Get(key string) string { if m.IsEmpty() { return "" } - hash := int(m.hashFunc([]byte(key))) + partitionKey := getPartitionKey(key) + hash := int(m.hashFunc([]byte(partitionKey))) // Binary search for appropriate replica. idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash }) diff --git a/src/pubsub/pubsub.go b/src/pubsub/pubsub.go index 2220915..3c51746 100644 --- a/src/pubsub/pubsub.go +++ b/src/pubsub/pubsub.go @@ -24,7 +24,7 @@ func makeMsg(t string, channel string, code int64) []byte { * invoker should lock channel * return: is new subscribed */ -func subscribe0(hub *Hub, channel string, client redis.Client) bool { +func subscribe0(hub *Hub, channel string, client redis.Connection) bool { client.SubsChannel(channel) // add into hub.subs @@ -47,7 +47,7 @@ func subscribe0(hub *Hub, channel string, client redis.Client) bool { * invoker should lock channel * return: is actually un-subscribe */ -func unsubscribe0(hub *Hub, channel string, client redis.Client) bool { +func unsubscribe0(hub *Hub, channel string, client redis.Connection) bool { client.UnSubsChannel(channel) // remove from hub.subs @@ -65,7 +65,7 @@ func unsubscribe0(hub *Hub, channel string, client redis.Client) bool { return false } -func Subscribe(hub *Hub, c redis.Client, args [][]byte) redis.Reply { +func Subscribe(hub *Hub, c redis.Connection, args [][]byte) redis.Reply { channels := make([]string, len(args)) for i, b := range args { channels[i] = string(b) @@ -82,7 +82,7 @@ func Subscribe(hub *Hub, c redis.Client, args [][]byte) redis.Reply { return &reply.NoReply{} } -func UnsubscribeAll(hub *Hub, c redis.Client) { +func UnsubscribeAll(hub *Hub, c redis.Connection) { channels := c.GetChannels() hub.subsLocker.Locks(channels...) @@ -94,7 +94,7 @@ func UnsubscribeAll(hub *Hub, c redis.Client) { } -func UnSubscribe(db *Hub, c redis.Client, args [][]byte) redis.Reply { +func UnSubscribe(db *Hub, c redis.Connection, args [][]byte) redis.Reply { var channels []string if len(args) > 0 { channels = make([]string, len(args)) @@ -137,7 +137,7 @@ func Publish(hub *Hub, args [][]byte) redis.Reply { } subscribers, _ := raw.(*list.LinkedList) subscribers.ForEach(func(i int, c interface{}) bool { - client, _ := c.(redis.Client) + client, _ := c.(redis.Connection) replyArgs := make([][]byte, 3) replyArgs[0] = messageBytes replyArgs[1] = []byte(channel) diff --git a/src/redis/reply/reply.go b/src/redis/reply/reply.go index cc63ee0..4571fac 100644 --- a/src/redis/reply/reply.go +++ b/src/redis/reply/reply.go @@ -1,6 +1,7 @@ package reply import ( + "github.com/HDT3213/godis/src/interface/redis" "strconv" ) @@ -103,6 +104,10 @@ func MakeErrReply(status string) *StandardErrReply { } } +func IsErrorReply(reply redis.Reply) bool { + return reply.ToBytes()[0] == '-' +} + func (r *StandardErrReply) ToBytes() []byte { return []byte("-" + r.Status + "\r\n") }