Updated sorted_set commands package to use new keyspace methods and its tests to use resp connection instead of calling handler directly

This commit is contained in:
Kelvin Clement Mwinuka
2024-05-26 14:48:13 +08:00
parent 15191dccee
commit edf82886bd
4 changed files with 1996 additions and 1570 deletions

View File

@@ -35,6 +35,7 @@ import (
"github.com/echovault/echovault/internal/modules/list" "github.com/echovault/echovault/internal/modules/list"
"github.com/echovault/echovault/internal/modules/pubsub" "github.com/echovault/echovault/internal/modules/pubsub"
"github.com/echovault/echovault/internal/modules/set" "github.com/echovault/echovault/internal/modules/set"
"github.com/echovault/echovault/internal/modules/sorted_set"
"github.com/echovault/echovault/internal/raft" "github.com/echovault/echovault/internal/raft"
"github.com/echovault/echovault/internal/snapshot" "github.com/echovault/echovault/internal/snapshot"
"io" "io"
@@ -141,7 +142,7 @@ func NewEchoVault(options ...func(echovault *EchoVault)) (*EchoVault, error) {
commands = append(commands, list.Commands()...) commands = append(commands, list.Commands()...)
commands = append(commands, pubsub.Commands()...) commands = append(commands, pubsub.Commands()...)
commands = append(commands, set.Commands()...) commands = append(commands, set.Commands()...)
// commands = append(commands, sorted_set.Commands()...) commands = append(commands, sorted_set.Commands()...)
// commands = append(commands, str.Commands()...) // commands = append(commands, str.Commands()...)
return commands return commands
}(), }(),

View File

@@ -33,6 +33,7 @@ func handleZADD(params internal.HandlerFuncParams) ([]byte, error) {
} }
key := keys.WriteKeys[0] key := keys.WriteKeys[0]
keyExists := params.KeysExist(keys.WriteKeys)[key]
var updatePolicy interface{} = nil var updatePolicy interface{} = nil
var comparison interface{} = nil var comparison interface{} = nil
@@ -139,14 +140,9 @@ func handleZADD(params internal.HandlerFuncParams) ([]byte, error) {
} }
} }
if params.KeyExists(params.Context, key) { if keyExists {
// Key exists // Key exists
_, err = params.KeyLock(params.Context, key) set, ok := params.GetValues(params.Context, []string{key})[key].(*SortedSet)
if err != nil {
return nil, err
}
defer params.KeyUnlock(params.Context, key)
set, ok := params.GetValue(params.Context, key).(*SortedSet)
if !ok { if !ok {
return nil, fmt.Errorf("value at %s is not a sorted set", key) return nil, fmt.Errorf("value at %s is not a sorted set", key)
} }
@@ -163,14 +159,9 @@ func handleZADD(params internal.HandlerFuncParams) ([]byte, error) {
return []byte(fmt.Sprintf(":%d\r\n", count)), nil return []byte(fmt.Sprintf(":%d\r\n", count)), nil
} }
// Key does not exist // Key does not exist.
if _, err = params.CreateKeyAndLock(params.Context, key); err != nil {
return nil, err
}
defer params.KeyUnlock(params.Context, key)
set := NewSortedSet(members) set := NewSortedSet(members)
if err = params.SetValue(params.Context, key, set); err != nil { if err = params.SetValues(params.Context, map[string]interface{}{key: set}); err != nil {
return nil, err return nil, err
} }
@@ -182,18 +173,15 @@ func handleZCARD(params internal.HandlerFuncParams) ([]byte, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
key := keys.ReadKeys[0]
if !params.KeyExists(params.Context, key) { key := keys.ReadKeys[0]
keyExists := params.KeysExist(keys.ReadKeys)[key]
if !keyExists {
return []byte(":0\r\n"), nil return []byte(":0\r\n"), nil
} }
if _, err = params.KeyRLock(params.Context, key); err != nil { set, ok := params.GetValues(params.Context, []string{key})[key].(*SortedSet)
return nil, err
}
defer params.KeyRUnlock(params.Context, key)
set, ok := params.GetValue(params.Context, key).(*SortedSet)
if !ok { if !ok {
return nil, fmt.Errorf("value at %s is not a sorted set", key) return nil, fmt.Errorf("value at %s is not a sorted set", key)
} }
@@ -208,6 +196,7 @@ func handleZCOUNT(params internal.HandlerFuncParams) ([]byte, error) {
} }
key := keys.ReadKeys[0] key := keys.ReadKeys[0]
keyExists := params.KeysExist(keys.ReadKeys)[key]
minimum := Score(math.Inf(-1)) minimum := Score(math.Inf(-1))
switch internal.AdaptType(params.Command[2]).(type) { switch internal.AdaptType(params.Command[2]).(type) {
@@ -245,16 +234,11 @@ func handleZCOUNT(params internal.HandlerFuncParams) ([]byte, error) {
maximum = Score(s) maximum = Score(s)
} }
if !params.KeyExists(params.Context, key) { if !keyExists {
return []byte(":0\r\n"), nil return []byte(":0\r\n"), nil
} }
if _, err = params.KeyRLock(params.Context, key); err != nil { set, ok := params.GetValues(params.Context, []string{key})[key].(*SortedSet)
return nil, err
}
defer params.KeyRUnlock(params.Context, key)
set, ok := params.GetValue(params.Context, key).(*SortedSet)
if !ok { if !ok {
return nil, fmt.Errorf("value at %s is not a sorted set", key) return nil, fmt.Errorf("value at %s is not a sorted set", key)
} }
@@ -276,19 +260,15 @@ func handleZLEXCOUNT(params internal.HandlerFuncParams) ([]byte, error) {
} }
key := keys.ReadKeys[0] key := keys.ReadKeys[0]
keyExists := params.KeysExist(keys.ReadKeys)[key]
minimum := params.Command[2] minimum := params.Command[2]
maximum := params.Command[3] maximum := params.Command[3]
if !params.KeyExists(params.Context, key) { if !keyExists {
return []byte(":0\r\n"), nil return []byte(":0\r\n"), nil
} }
if _, err = params.KeyRLock(params.Context, key); err != nil { set, ok := params.GetValues(params.Context, []string{key})[key].(*SortedSet)
return nil, err
}
defer params.KeyRUnlock(params.Context, key)
set, ok := params.GetValue(params.Context, key).(*SortedSet)
if !ok { if !ok {
return nil, fmt.Errorf("value at %s is not a sorted set", key) return nil, fmt.Errorf("value at %s is not a sorted set", key)
} }
@@ -320,6 +300,8 @@ func handleZDIFF(params internal.HandlerFuncParams) ([]byte, error) {
return nil, err return nil, err
} }
keyExists := params.KeysExist(keys.ReadKeys)
withscoresIndex := slices.IndexFunc(params.Command, func(s string) bool { withscoresIndex := slices.IndexFunc(params.Command, func(s string) bool {
return strings.EqualFold(s, "withscores") return strings.EqualFold(s, "withscores")
}) })
@@ -327,25 +309,13 @@ func handleZDIFF(params internal.HandlerFuncParams) ([]byte, error) {
return nil, errors.New(constants.WrongArgsResponse) return nil, errors.New(constants.WrongArgsResponse)
} }
locks := make(map[string]bool)
defer func() {
for key, locked := range locks {
if locked {
params.KeyRUnlock(params.Context, key)
}
}
}()
// Extract base set // Extract base set
if !params.KeyExists(params.Context, keys.ReadKeys[0]) { if !keyExists[keys.ReadKeys[0]] {
// If base set does not exist, return an empty array // If base set does not exist, return an empty array
return []byte("*0\r\n"), nil return []byte("*0\r\n"), nil
} }
if _, err = params.KeyRLock(params.Context, keys.ReadKeys[0]); err != nil {
return nil, err baseSortedSet, ok := params.GetValues(params.Context, []string{keys.ReadKeys[0]})[keys.ReadKeys[0]].(*SortedSet)
}
defer params.KeyRUnlock(params.Context, keys.ReadKeys[0])
baseSortedSet, ok := params.GetValue(params.Context, keys.ReadKeys[0]).(*SortedSet)
if !ok { if !ok {
return nil, fmt.Errorf("value at %s is not a sorted set", keys.ReadKeys[0]) return nil, fmt.Errorf("value at %s is not a sorted set", keys.ReadKeys[0])
} }
@@ -354,15 +324,10 @@ func handleZDIFF(params internal.HandlerFuncParams) ([]byte, error) {
var sets []*SortedSet var sets []*SortedSet
for i := 1; i < len(keys.ReadKeys); i++ { for i := 1; i < len(keys.ReadKeys); i++ {
if !params.KeyExists(params.Context, keys.ReadKeys[i]) { if !keyExists[keys.ReadKeys[i]] {
continue continue
} }
locked, err := params.KeyRLock(params.Context, keys.ReadKeys[i]) set, ok := params.GetValues(params.Context, []string{keys.ReadKeys[i]})[keys.ReadKeys[i]].(*SortedSet)
if err != nil {
return nil, err
}
locks[keys.ReadKeys[i]] = locked
set, ok := params.GetValue(params.Context, keys.ReadKeys[i]).(*SortedSet)
if !ok { if !ok {
return nil, fmt.Errorf("value at %s is not a sorted set", keys.ReadKeys[i]) return nil, fmt.Errorf("value at %s is not a sorted set", keys.ReadKeys[i])
} }
@@ -376,7 +341,8 @@ func handleZDIFF(params internal.HandlerFuncParams) ([]byte, error) {
for _, m := range diff.GetAll() { for _, m := range diff.GetAll() {
if includeScores { if includeScores {
res += fmt.Sprintf("\r\n*2\r\n$%d\r\n%s\r\n+%s", len(m.Value), m.Value, strconv.FormatFloat(float64(m.Score), 'f', -1, 64)) res += fmt.Sprintf("\r\n*2\r\n$%d\r\n%s\r\n+%s",
len(m.Value), m.Value, strconv.FormatFloat(float64(m.Score), 'f', -1, 64))
} else { } else {
res += fmt.Sprintf("\r\n*1\r\n$%d\r\n%s", len(m.Value), m.Value) res += fmt.Sprintf("\r\n*1\r\n$%d\r\n%s", len(m.Value), m.Value)
} }
@@ -393,27 +359,16 @@ func handleZDIFFSTORE(params internal.HandlerFuncParams) ([]byte, error) {
return nil, err return nil, err
} }
keyExists := params.KeysExist(keys.ReadKeys)
destination := keys.WriteKeys[0] destination := keys.WriteKeys[0]
locks := make(map[string]bool)
defer func() {
for key, locked := range locks {
if locked {
params.KeyRUnlock(params.Context, key)
}
}
}()
// Extract base set // Extract base set
if !params.KeyExists(params.Context, keys.ReadKeys[0]) { if !keyExists[keys.ReadKeys[0]] {
// If base set does not exist, return 0 // If base set does not exist, return 0
return []byte(":0\r\n"), nil return []byte(":0\r\n"), nil
} }
if _, err = params.KeyRLock(params.Context, keys.ReadKeys[0]); err != nil {
return nil, err baseSortedSet, ok := params.GetValues(params.Context, []string{keys.ReadKeys[0]})[keys.ReadKeys[0]].(*SortedSet)
}
defer params.KeyRUnlock(params.Context, keys.ReadKeys[0])
baseSortedSet, ok := params.GetValue(params.Context, keys.ReadKeys[0]).(*SortedSet)
if !ok { if !ok {
return nil, fmt.Errorf("value at %s is not a sorted set", keys.ReadKeys[0]) return nil, fmt.Errorf("value at %s is not a sorted set", keys.ReadKeys[0])
} }
@@ -421,11 +376,8 @@ func handleZDIFFSTORE(params internal.HandlerFuncParams) ([]byte, error) {
var sets []*SortedSet var sets []*SortedSet
for i := 1; i < len(keys.ReadKeys); i++ { for i := 1; i < len(keys.ReadKeys); i++ {
if params.KeyExists(params.Context, keys.ReadKeys[i]) { if keyExists[keys.ReadKeys[i]] {
if _, err = params.KeyRLock(params.Context, keys.ReadKeys[i]); err != nil { set, ok := params.GetValues(params.Context, []string{keys.ReadKeys[i]})[keys.ReadKeys[i]].(*SortedSet)
return nil, err
}
set, ok := params.GetValue(params.Context, keys.ReadKeys[i]).(*SortedSet)
if !ok { if !ok {
return nil, fmt.Errorf("value at %s is not a sorted set", keys.ReadKeys[i]) return nil, fmt.Errorf("value at %s is not a sorted set", keys.ReadKeys[i])
} }
@@ -434,19 +386,7 @@ func handleZDIFFSTORE(params internal.HandlerFuncParams) ([]byte, error) {
} }
diff := baseSortedSet.Subtract(sets) diff := baseSortedSet.Subtract(sets)
if err = params.SetValues(params.Context, map[string]interface{}{destination: diff}); err != nil {
if params.KeyExists(params.Context, destination) {
if _, err = params.KeyLock(params.Context, destination); err != nil {
return nil, err
}
} else {
if _, err = params.CreateKeyAndLock(params.Context, destination); err != nil {
return nil, err
}
}
defer params.KeyUnlock(params.Context, destination)
if err = params.SetValue(params.Context, destination, diff); err != nil {
return nil, err return nil, err
} }
@@ -460,6 +400,8 @@ func handleZINCRBY(params internal.HandlerFuncParams) ([]byte, error) {
} }
key := keys.WriteKeys[0] key := keys.WriteKeys[0]
keyExists := params.KeysExist(keys.WriteKeys)[key]
member := Value(params.Command[3]) member := Value(params.Command[3])
var increment Score var increment Score
@@ -482,28 +424,21 @@ func handleZINCRBY(params internal.HandlerFuncParams) ([]byte, error) {
increment = Score(s) increment = Score(s)
} }
if !params.KeyExists(params.Context, key) { if !keyExists {
// If the key does not exist, create a new sorted set at the key with // If the key does not exist, create a new sorted set at the key with
// the member and increment as the first value // the member and increment as the first value
if _, err = params.CreateKeyAndLock(params.Context, key); err != nil { if err = params.SetValues(
return nil, err
}
if err = params.SetValue(
params.Context, params.Context,
key, map[string]interface{}{
NewSortedSet([]MemberParam{{Value: member, Score: increment}}), key: NewSortedSet([]MemberParam{{Value: member, Score: increment}}),
},
); err != nil { ); err != nil {
return nil, err return nil, err
} }
params.KeyUnlock(params.Context, key)
return []byte(fmt.Sprintf("+%s\r\n", strconv.FormatFloat(float64(increment), 'f', -1, 64))), nil return []byte(fmt.Sprintf("+%s\r\n", strconv.FormatFloat(float64(increment), 'f', -1, 64))), nil
} }
if _, err = params.KeyLock(params.Context, key); err != nil { set, ok := params.GetValues(params.Context, []string{key})[key].(*SortedSet)
return nil, err
}
defer params.KeyUnlock(params.Context, key)
set, ok := params.GetValue(params.Context, key).(*SortedSet)
if !ok { if !ok {
return nil, fmt.Errorf("value at %s is not a sorted set", key) return nil, fmt.Errorf("value at %s is not a sorted set", key)
} }
@@ -530,28 +465,17 @@ func handleZINTER(params internal.HandlerFuncParams) ([]byte, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
keyExists := params.KeysExist(keys)
locks := make(map[string]bool)
defer func() {
for key, locked := range locks {
if locked {
params.KeyRUnlock(params.Context, key)
}
}
}()
var setParams []SortedSetParam var setParams []SortedSetParam
values := params.GetValues(params.Context, keys)
for i := 0; i < len(keys); i++ { for i := 0; i < len(keys); i++ {
if !params.KeyExists(params.Context, keys[i]) { if !keyExists[keys[i]] {
// If any of the keys is non-existent, return an empty array as there's no intersect // If any of the keys is non-existent, return an empty array as there's no intersect
return []byte("*0\r\n"), nil return []byte("*0\r\n"), nil
} }
if _, err = params.KeyRLock(params.Context, keys[i]); err != nil { set, ok := values[keys[i]].(*SortedSet)
return nil, err
}
locks[keys[i]] = true
set, ok := params.GetValue(params.Context, keys[i]).(*SortedSet)
if !ok { if !ok {
return nil, fmt.Errorf("value at %s is not a sorted set", keys[i]) return nil, fmt.Errorf("value at %s is not a sorted set", keys[i])
} }
@@ -586,6 +510,7 @@ func handleZINTERSTORE(params internal.HandlerFuncParams) ([]byte, error) {
return nil, err return nil, err
} }
keyExists := params.KeysExist(k.ReadKeys)
destination := k.WriteKeys[0] destination := k.WriteKeys[0]
// Remove the destination keys from the command before parsing it // Remove the destination keys from the command before parsing it
@@ -598,26 +523,14 @@ func handleZINTERSTORE(params internal.HandlerFuncParams) ([]byte, error) {
return nil, err return nil, err
} }
locks := make(map[string]bool)
defer func() {
for key, locked := range locks {
if locked {
params.KeyRUnlock(params.Context, key)
}
}
}()
var setParams []SortedSetParam var setParams []SortedSetParam
values := params.GetValues(params.Context, keys)
for i := 0; i < len(keys); i++ { for i := 0; i < len(keys); i++ {
if !params.KeyExists(params.Context, keys[i]) { if !keyExists[keys[i]] {
return []byte(":0\r\n"), nil return []byte(":0\r\n"), nil
} }
if _, err = params.KeyRLock(params.Context, keys[i]); err != nil { set, ok := values[keys[i]].(*SortedSet)
return nil, err
}
locks[keys[i]] = true
set, ok := params.GetValue(params.Context, keys[i]).(*SortedSet)
if !ok { if !ok {
return nil, fmt.Errorf("value at %s is not a sorted set", keys[i]) return nil, fmt.Errorf("value at %s is not a sorted set", keys[i])
} }
@@ -628,19 +541,9 @@ func handleZINTERSTORE(params internal.HandlerFuncParams) ([]byte, error) {
} }
intersect := Intersect(aggregate, setParams...) intersect := Intersect(aggregate, setParams...)
if err = params.SetValues(params.Context, map[string]interface{}{
if params.KeyExists(params.Context, destination) && intersect.Cardinality() > 0 { destination: intersect,
if _, err = params.KeyLock(params.Context, destination); err != nil { }); err != nil {
return nil, err
}
} else if intersect.Cardinality() > 0 {
if _, err = params.CreateKeyAndLock(params.Context, destination); err != nil {
return nil, err
}
}
defer params.KeyUnlock(params.Context, destination)
if err = params.SetValue(params.Context, destination, intersect); err != nil {
return nil, err return nil, err
} }
@@ -653,6 +556,8 @@ func handleZMPOP(params internal.HandlerFuncParams) ([]byte, error) {
return nil, err return nil, err
} }
keyExists := params.KeysExist(keys.WriteKeys)
count := 1 count := 1
policy := "min" policy := "min"
modifierIdx := -1 modifierIdx := -1
@@ -694,21 +599,15 @@ func handleZMPOP(params internal.HandlerFuncParams) ([]byte, error) {
} }
for i := 0; i < len(keys.WriteKeys); i++ { for i := 0; i < len(keys.WriteKeys); i++ {
if params.KeyExists(params.Context, keys.WriteKeys[i]) { if keyExists[keys.WriteKeys[i]] {
if _, err = params.KeyLock(params.Context, keys.WriteKeys[i]); err != nil { v, ok := params.GetValues(params.Context, []string{keys.WriteKeys[i]})[keys.WriteKeys[i]].(*SortedSet)
continue
}
v, ok := params.GetValue(params.Context, keys.WriteKeys[i]).(*SortedSet)
if !ok || v.Cardinality() == 0 { if !ok || v.Cardinality() == 0 {
params.KeyUnlock(params.Context, keys.WriteKeys[i])
continue continue
} }
popped, err := v.Pop(count, policy) popped, err := v.Pop(count, policy)
if err != nil { if err != nil {
params.KeyUnlock(params.Context, keys.WriteKeys[i])
return nil, err return nil, err
} }
params.KeyUnlock(params.Context, keys.WriteKeys[i])
res := fmt.Sprintf("*%d", popped.Cardinality()) res := fmt.Sprintf("*%d", popped.Cardinality())
@@ -732,6 +631,7 @@ func handleZPOP(params internal.HandlerFuncParams) ([]byte, error) {
} }
key := keys.WriteKeys[0] key := keys.WriteKeys[0]
keyExists := params.KeysExist(keys.WriteKeys)[key]
count := 1 count := 1
policy := "min" policy := "min"
@@ -749,16 +649,11 @@ func handleZPOP(params internal.HandlerFuncParams) ([]byte, error) {
} }
} }
if !params.KeyExists(params.Context, key) { if !keyExists {
return []byte("*0\r\n"), nil return []byte("*0\r\n"), nil
} }
if _, err = params.KeyLock(params.Context, key); err != nil { set, ok := params.GetValues(params.Context, []string{key})[key].(*SortedSet)
return nil, err
}
defer params.KeyUnlock(params.Context, key)
set, ok := params.GetValue(params.Context, key).(*SortedSet)
if !ok { if !ok {
return nil, fmt.Errorf("value at key %s is not a sorted set", key) return nil, fmt.Errorf("value at key %s is not a sorted set", key)
} }
@@ -770,7 +665,8 @@ func handleZPOP(params internal.HandlerFuncParams) ([]byte, error) {
res := fmt.Sprintf("*%d", popped.Cardinality()) res := fmt.Sprintf("*%d", popped.Cardinality())
for _, m := range popped.GetAll() { for _, m := range popped.GetAll() {
res += fmt.Sprintf("\r\n*2\r\n$%d\r\n%s\r\n+%s", len(m.Value), m.Value, strconv.FormatFloat(float64(m.Score), 'f', -1, 64)) res += fmt.Sprintf("\r\n*2\r\n$%d\r\n%s\r\n+%s",
len(m.Value), m.Value, strconv.FormatFloat(float64(m.Score), 'f', -1, 64))
} }
res += "\r\n" res += "\r\n"
@@ -785,17 +681,13 @@ func handleZMSCORE(params internal.HandlerFuncParams) ([]byte, error) {
} }
key := keys.ReadKeys[0] key := keys.ReadKeys[0]
keyExists := params.KeysExist(keys.ReadKeys)[key]
if !params.KeyExists(params.Context, key) { if !keyExists {
return []byte("*0\r\n"), nil return []byte("*0\r\n"), nil
} }
if _, err = params.KeyRLock(params.Context, key); err != nil { set, ok := params.GetValues(params.Context, []string{key})[key].(*SortedSet)
return nil, err
}
defer params.KeyRUnlock(params.Context, key)
set, ok := params.GetValue(params.Context, key).(*SortedSet)
if !ok { if !ok {
return nil, fmt.Errorf("value at %s is not a sorted set", key) return nil, fmt.Errorf("value at %s is not a sorted set", key)
} }
@@ -827,6 +719,7 @@ func handleZRANDMEMBER(params internal.HandlerFuncParams) ([]byte, error) {
} }
key := keys.ReadKeys[0] key := keys.ReadKeys[0]
keyExists := params.KeysExist(keys.ReadKeys)[key]
count := 1 count := 1
if len(params.Command) >= 3 { if len(params.Command) >= 3 {
@@ -848,16 +741,11 @@ func handleZRANDMEMBER(params internal.HandlerFuncParams) ([]byte, error) {
} }
} }
if !params.KeyExists(params.Context, key) { if !keyExists {
return []byte("$-1\r\n"), nil return []byte("$-1\r\n"), nil
} }
if _, err = params.KeyRLock(params.Context, key); err != nil { set, ok := params.GetValues(params.Context, []string{key})[key].(*SortedSet)
return nil, err
}
defer params.KeyRUnlock(params.Context, key)
set, ok := params.GetValue(params.Context, key).(*SortedSet)
if !ok { if !ok {
return nil, fmt.Errorf("value at %s is not a sorted set", key) return nil, fmt.Errorf("value at %s is not a sorted set", key)
} }
@@ -885,6 +773,7 @@ func handleZRANK(params internal.HandlerFuncParams) ([]byte, error) {
} }
key := keys.ReadKeys[0] key := keys.ReadKeys[0]
keyExists := params.KeysExist(keys.ReadKeys)[key]
member := params.Command[2] member := params.Command[2]
withscores := false withscores := false
@@ -892,16 +781,11 @@ func handleZRANK(params internal.HandlerFuncParams) ([]byte, error) {
withscores = true withscores = true
} }
if !params.KeyExists(params.Context, key) { if !keyExists {
return []byte("$-1\r\n"), nil return []byte("$-1\r\n"), nil
} }
if _, err = params.KeyRLock(params.Context, key); err != nil { set, ok := params.GetValues(params.Context, []string{key})[key].(*SortedSet)
return nil, err
}
defer params.KeyRUnlock(params.Context, key)
set, ok := params.GetValue(params.Context, key).(*SortedSet)
if !ok { if !ok {
return nil, fmt.Errorf("value at %s is not a sorted set", key) return nil, fmt.Errorf("value at %s is not a sorted set", key)
} }
@@ -935,17 +819,13 @@ func handleZREM(params internal.HandlerFuncParams) ([]byte, error) {
} }
key := keys.WriteKeys[0] key := keys.WriteKeys[0]
keyExists := params.KeysExist(keys.WriteKeys)[key]
if !params.KeyExists(params.Context, key) { if !keyExists {
return []byte(":0\r\n"), nil return []byte(":0\r\n"), nil
} }
if _, err = params.KeyLock(params.Context, key); err != nil { set, ok := params.GetValues(params.Context, []string{key})[key].(*SortedSet)
return nil, err
}
defer params.KeyUnlock(params.Context, key)
set, ok := params.GetValue(params.Context, key).(*SortedSet)
if !ok { if !ok {
return nil, fmt.Errorf("value at %s is not a sorted set", key) return nil, fmt.Errorf("value at %s is not a sorted set", key)
} }
@@ -967,15 +847,13 @@ func handleZSCORE(params internal.HandlerFuncParams) ([]byte, error) {
} }
key := keys.ReadKeys[0] key := keys.ReadKeys[0]
keyExists := params.KeysExist(keys.ReadKeys)[key]
if !params.KeyExists(params.Context, key) { if !keyExists {
return []byte("$-1\r\n"), nil return []byte("$-1\r\n"), nil
} }
if _, err = params.KeyRLock(params.Context, key); err != nil {
return nil, err set, ok := params.GetValues(params.Context, []string{key})[key].(*SortedSet)
}
defer params.KeyRUnlock(params.Context, key)
set, ok := params.GetValue(params.Context, key).(*SortedSet)
if !ok { if !ok {
return nil, fmt.Errorf("value at %s is not a sorted set", key) return nil, fmt.Errorf("value at %s is not a sorted set", key)
} }
@@ -996,6 +874,7 @@ func handleZREMRANGEBYSCORE(params internal.HandlerFuncParams) ([]byte, error) {
} }
key := keys.WriteKeys[0] key := keys.WriteKeys[0]
keyExists := params.KeysExist(keys.WriteKeys)[key]
deletedCount := 0 deletedCount := 0
@@ -1009,16 +888,11 @@ func handleZREMRANGEBYSCORE(params internal.HandlerFuncParams) ([]byte, error) {
return nil, err return nil, err
} }
if !params.KeyExists(params.Context, key) { if !keyExists {
return []byte(":0\r\n"), nil return []byte(":0\r\n"), nil
} }
if _, err = params.KeyLock(params.Context, key); err != nil { set, ok := params.GetValues(params.Context, []string{key})[key].(*SortedSet)
return nil, err
}
defer params.KeyUnlock(params.Context, key)
set, ok := params.GetValue(params.Context, key).(*SortedSet)
if !ok { if !ok {
return nil, fmt.Errorf("value at %s is not a sorted set", key) return nil, fmt.Errorf("value at %s is not a sorted set", key)
} }
@@ -1040,6 +914,7 @@ func handleZREMRANGEBYRANK(params internal.HandlerFuncParams) ([]byte, error) {
} }
key := keys.WriteKeys[0] key := keys.WriteKeys[0]
keyExists := params.KeysExist(keys.WriteKeys)[key]
start, err := strconv.Atoi(params.Command[2]) start, err := strconv.Atoi(params.Command[2])
if err != nil { if err != nil {
@@ -1051,16 +926,11 @@ func handleZREMRANGEBYRANK(params internal.HandlerFuncParams) ([]byte, error) {
return nil, err return nil, err
} }
if !params.KeyExists(params.Context, key) { if !keyExists {
return []byte(":0\r\n"), nil return []byte(":0\r\n"), nil
} }
if _, err = params.KeyLock(params.Context, key); err != nil { set, ok := params.GetValues(params.Context, []string{key})[key].(*SortedSet)
return nil, err
}
defer params.KeyUnlock(params.Context, key)
set, ok := params.GetValue(params.Context, key).(*SortedSet)
if !ok { if !ok {
return nil, fmt.Errorf("value at %s is not a sorted set", key) return nil, fmt.Errorf("value at %s is not a sorted set", key)
} }
@@ -1105,19 +975,15 @@ func handleZREMRANGEBYLEX(params internal.HandlerFuncParams) ([]byte, error) {
} }
key := keys.WriteKeys[0] key := keys.WriteKeys[0]
keyExists := params.KeysExist(keys.WriteKeys)[key]
minimum := params.Command[2] minimum := params.Command[2]
maximum := params.Command[3] maximum := params.Command[3]
if !params.KeyExists(params.Context, key) { if !keyExists {
return []byte(":0\r\n"), nil return []byte(":0\r\n"), nil
} }
if _, err = params.KeyLock(params.Context, key); err != nil { set, ok := params.GetValues(params.Context, []string{key})[key].(*SortedSet)
return nil, err
}
defer params.KeyUnlock(params.Context, key)
set, ok := params.GetValue(params.Context, key).(*SortedSet)
if !ok { if !ok {
return nil, fmt.Errorf("value at %s is not a sorted set", key) return nil, fmt.Errorf("value at %s is not a sorted set", key)
} }
@@ -1152,6 +1018,8 @@ func handleZRANGE(params internal.HandlerFuncParams) ([]byte, error) {
} }
key := keys.ReadKeys[0] key := keys.ReadKeys[0]
keyExists := params.KeysExist(keys.ReadKeys)[key]
policy := "byscore" policy := "byscore"
scoreStart := math.Inf(-1) // Lower bound if policy is "byscore" scoreStart := math.Inf(-1) // Lower bound if policy is "byscore"
scoreStop := math.Inf(1) // Upper bound if policy is "byscore" scoreStop := math.Inf(1) // Upper bound if policy is "byscore"
@@ -1206,16 +1074,11 @@ func handleZRANGE(params internal.HandlerFuncParams) ([]byte, error) {
} }
} }
if !params.KeyExists(params.Context, key) { if !keyExists {
return []byte("*0\r\n"), nil return []byte("*0\r\n"), nil
} }
if _, err = params.KeyRLock(params.Context, key); err != nil { set, ok := params.GetValues(params.Context, []string{key})[key].(*SortedSet)
return nil, err
}
defer params.KeyRUnlock(params.Context, key)
set, ok := params.GetValue(params.Context, key).(*SortedSet)
if !ok { if !ok {
return nil, fmt.Errorf("value at %s is not a sorted set", key) return nil, fmt.Errorf("value at %s is not a sorted set", key)
} }
@@ -1293,6 +1156,7 @@ func handleZRANGESTORE(params internal.HandlerFuncParams) ([]byte, error) {
destination := keys.WriteKeys[0] destination := keys.WriteKeys[0]
source := keys.ReadKeys[0] source := keys.ReadKeys[0]
sourceExists := params.KeysExist(keys.ReadKeys)[source]
policy := "byscore" policy := "byscore"
scoreStart := math.Inf(-1) // Lower bound if policy is "byscore" scoreStart := math.Inf(-1) // Lower bound if policy is "byscore"
scoreStop := math.Inf(1) // Upper bound if policy is "byfloat" scoreStop := math.Inf(1) // Upper bound if policy is "byfloat"
@@ -1343,16 +1207,11 @@ func handleZRANGESTORE(params internal.HandlerFuncParams) ([]byte, error) {
} }
} }
if !params.KeyExists(params.Context, source) { if !sourceExists {
return []byte("*0\r\n"), nil return []byte("*0\r\n"), nil
} }
if _, err = params.KeyRLock(params.Context, source); err != nil { set, ok := params.GetValues(params.Context, []string{source})[source].(*SortedSet)
return nil, err
}
defer params.KeyRUnlock(params.Context, source)
set, ok := params.GetValue(params.Context, source).(*SortedSet)
if !ok { if !ok {
return nil, fmt.Errorf("value at %s is not a sorted set", source) return nil, fmt.Errorf("value at %s is not a sorted set", source)
} }
@@ -1408,19 +1267,9 @@ func handleZRANGESTORE(params internal.HandlerFuncParams) ([]byte, error) {
} }
newSortedSet := NewSortedSet(resultMembers) newSortedSet := NewSortedSet(resultMembers)
if err = params.SetValues(params.Context, map[string]interface{}{
if params.KeyExists(params.Context, destination) { destination: newSortedSet,
if _, err = params.KeyLock(params.Context, destination); err != nil { }); err != nil {
return nil, err
}
} else {
if _, err = params.CreateKeyAndLock(params.Context, destination); err != nil {
return nil, err
}
}
defer params.KeyUnlock(params.Context, destination)
if err = params.SetValue(params.Context, destination, newSortedSet); err != nil {
return nil, err return nil, err
} }
@@ -1437,24 +1286,14 @@ func handleZUNION(params internal.HandlerFuncParams) ([]byte, error) {
return nil, err return nil, err
} }
locks := make(map[string]bool) keyExists := params.KeysExist(keys)
defer func() {
for key, locked := range locks {
if locked {
params.KeyRUnlock(params.Context, key)
}
}
}()
var setParams []SortedSetParam var setParams []SortedSetParam
values := params.GetValues(params.Context, keys)
for i := 0; i < len(keys); i++ { for i := 0; i < len(keys); i++ {
if params.KeyExists(params.Context, keys[i]) { if keyExists[keys[i]] {
if _, err = params.KeyRLock(params.Context, keys[i]); err != nil { set, ok := values[keys[i]].(*SortedSet)
return nil, err
}
locks[keys[i]] = true
set, ok := params.GetValue(params.Context, keys[i]).(*SortedSet)
if !ok { if !ok {
return nil, fmt.Errorf("value at %s is not a sorted set", keys[i]) return nil, fmt.Errorf("value at %s is not a sorted set", keys[i])
} }
@@ -1499,24 +1338,14 @@ func handleZUNIONSTORE(params internal.HandlerFuncParams) ([]byte, error) {
return nil, err return nil, err
} }
locks := make(map[string]bool) keyExists := params.KeysExist(keys)
defer func() {
for key, locked := range locks {
if locked {
params.KeyRUnlock(params.Context, key)
}
}
}()
var setParams []SortedSetParam var setParams []SortedSetParam
values := params.GetValues(params.Context, keys)
for i := 0; i < len(keys); i++ { for i := 0; i < len(keys); i++ {
if params.KeyExists(params.Context, keys[i]) { if keyExists[keys[i]] {
if _, err = params.KeyRLock(params.Context, keys[i]); err != nil { set, ok := values[keys[i]].(*SortedSet)
return nil, err
}
locks[keys[i]] = true
set, ok := params.GetValue(params.Context, keys[i]).(*SortedSet)
if !ok { if !ok {
return nil, fmt.Errorf("value at %s is not a sorted set", keys[i]) return nil, fmt.Errorf("value at %s is not a sorted set", keys[i])
} }
@@ -1528,19 +1357,9 @@ func handleZUNIONSTORE(params internal.HandlerFuncParams) ([]byte, error) {
} }
union := Union(aggregate, setParams...) union := Union(aggregate, setParams...)
if err = params.SetValues(params.Context, map[string]interface{}{
if params.KeyExists(params.Context, destination) { destination: union,
if _, err = params.KeyLock(params.Context, destination); err != nil { }); err != nil {
return nil, err
}
} else {
if _, err = params.CreateKeyAndLock(params.Context, destination); err != nil {
return nil, err
}
}
defer params.KeyUnlock(params.Context, destination)
if err = params.SetValue(params.Context, destination, union); err != nil {
return nil, err return nil, err
} }

File diff suppressed because it is too large Load Diff

View File

@@ -135,13 +135,11 @@ func zinterstoreKeyFunc(cmd []string) (internal.KeyExtractionFuncResult, error)
return internal.KeyExtractionFuncResult{}, errors.New(constants.WrongArgsResponse) return internal.KeyExtractionFuncResult{}, errors.New(constants.WrongArgsResponse)
} }
endIdx := slices.IndexFunc(cmd[1:], func(s string) bool { endIdx := slices.IndexFunc(cmd[1:], func(s string) bool {
if strings.EqualFold(s, "WEIGHTS") || return strings.EqualFold(s, "WEIGHTS") ||
strings.EqualFold(s, "AGGREGATE") || strings.EqualFold(s, "AGGREGATE") ||
strings.EqualFold(s, "WITHSCORES") { strings.EqualFold(s, "WITHSCORES")
return true
}
return false
}) })
if endIdx == -1 { if endIdx == -1 {
return internal.KeyExtractionFuncResult{ return internal.KeyExtractionFuncResult{
Channels: make([]string, 0), Channels: make([]string, 0),
@@ -149,13 +147,15 @@ func zinterstoreKeyFunc(cmd []string) (internal.KeyExtractionFuncResult, error)
WriteKeys: cmd[1:2], WriteKeys: cmd[1:2],
}, nil }, nil
} }
if endIdx >= 3 { if endIdx >= 3 {
return internal.KeyExtractionFuncResult{ return internal.KeyExtractionFuncResult{
Channels: make([]string, 0), Channels: make([]string, 0),
ReadKeys: cmd[2:endIdx], ReadKeys: cmd[2 : endIdx+1],
WriteKeys: cmd[1:2], WriteKeys: cmd[1:2],
}, nil }, nil
} }
return internal.KeyExtractionFuncResult{}, errors.New(constants.WrongArgsResponse) return internal.KeyExtractionFuncResult{}, errors.New(constants.WrongArgsResponse)
} }
@@ -377,7 +377,7 @@ func zunionstoreKeyFunc(cmd []string) (internal.KeyExtractionFuncResult, error)
if endIdx >= 1 { if endIdx >= 1 {
return internal.KeyExtractionFuncResult{ return internal.KeyExtractionFuncResult{
Channels: make([]string, 0), Channels: make([]string, 0),
ReadKeys: cmd[2:endIdx], ReadKeys: cmd[2 : endIdx+1],
WriteKeys: cmd[1:2], WriteKeys: cmd[1:2],
}, nil }, nil
} }