Implemented new param methods and testing through resp connection in set commands package

This commit is contained in:
Kelvin Clement Mwinuka
2024-05-26 08:41:29 +08:00
parent 9f371caa4b
commit 15191dccee
4 changed files with 1659 additions and 4126 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -34,6 +34,7 @@ import (
"github.com/echovault/echovault/internal/modules/hash" "github.com/echovault/echovault/internal/modules/hash"
"github.com/echovault/echovault/internal/modules/list" "github.com/echovault/echovault/internal/modules/list"
"github.com/echovault/echovault/internal/modules/pubsub" "github.com/echovault/echovault/internal/modules/pubsub"
"github.com/echovault/echovault/internal/modules/set"
"github.com/echovault/echovault/internal/raft" "github.com/echovault/echovault/internal/raft"
"github.com/echovault/echovault/internal/snapshot" "github.com/echovault/echovault/internal/snapshot"
"io" "io"
@@ -139,7 +140,7 @@ func NewEchoVault(options ...func(echovault *EchoVault)) (*EchoVault, error) {
commands = append(commands, hash.Commands()...) commands = append(commands, hash.Commands()...)
commands = append(commands, list.Commands()...) commands = append(commands, list.Commands()...)
commands = append(commands, pubsub.Commands()...) commands = append(commands, pubsub.Commands()...)
// commands = append(commands, set.Commands()...) commands = append(commands, set.Commands()...)
// commands = append(commands, sorted_set.Commands()...) // commands = append(commands, sorted_set.Commands()...)
// commands = append(commands, str.Commands()...) // commands = append(commands, str.Commands()...)
return commands return commands

View File

@@ -30,27 +30,19 @@ func handleSADD(params internal.HandlerFuncParams) ([]byte, error) {
} }
key := keys.WriteKeys[0] key := keys.WriteKeys[0]
keyExists := params.KeysExist(keys.WriteKeys)[key]
var set *Set var set *Set
if !params.KeyExists(params.Context, key) { if !keyExists {
set = NewSet(params.Command[2:]) set = NewSet(params.Command[2:])
if ok, err := params.CreateKeyAndLock(params.Context, key); !ok && err != nil { if err = params.SetValues(params.Context, map[string]interface{}{key: set}); err != nil {
return nil, err return nil, err
} }
if err = params.SetValue(params.Context, key, set); err != nil {
return nil, err
}
params.KeyUnlock(params.Context, key)
return []byte(fmt.Sprintf(":%d\r\n", len(params.Command[2:]))), nil return []byte(fmt.Sprintf(":%d\r\n", len(params.Command[2:]))), nil
} }
if _, err = params.KeyLock(params.Context, key); err != nil { set, ok := params.GetValues(params.Context, []string{key})[key].(*Set)
return nil, err
}
defer params.KeyUnlock(params.Context, key)
set, ok := params.GetValue(params.Context, key).(*Set)
if !ok { if !ok {
return nil, fmt.Errorf("value at key %s is not a set", key) return nil, fmt.Errorf("value at key %s is not a set", key)
} }
@@ -67,17 +59,13 @@ func handleSCARD(params internal.HandlerFuncParams) ([]byte, error) {
} }
key := keys.ReadKeys[0] key := keys.ReadKeys[0]
keyExists := params.KeysExist(keys.ReadKeys)[key]
if !params.KeyExists(params.Context, key) { if !keyExists {
return []byte(fmt.Sprintf(":0\r\n")), nil return []byte(fmt.Sprintf(":0\r\n")), nil
} }
if _, err = params.KeyRLock(params.Context, key); err != nil { set, ok := params.GetValues(params.Context, []string{key})[key].(*Set)
return nil, err
}
defer params.KeyRUnlock(params.Context, key)
set, ok := params.GetValue(params.Context, key).(*Set)
if !ok { if !ok {
return nil, fmt.Errorf("value at key %s is not a set", key) return nil, fmt.Errorf("value at key %s is not a set", key)
} }
@@ -93,41 +81,21 @@ func handleSDIFF(params internal.HandlerFuncParams) ([]byte, error) {
return nil, err return nil, err
} }
keyExists := params.KeysExist(keys.ReadKeys)
// Extract base set first // Extract base set first
if !params.KeyExists(params.Context, keys.ReadKeys[0]) { if !keyExists[keys.ReadKeys[0]] {
return nil, fmt.Errorf("key for base set \"%s\" does not exist", keys.ReadKeys[0]) return nil, fmt.Errorf("key for base set \"%s\" does not exist", keys.ReadKeys[0])
} }
if _, err = params.KeyRLock(params.Context, keys.ReadKeys[0]); err != nil {
return nil, err baseSet, ok := params.GetValues(params.Context, []string{keys.ReadKeys[0]})[keys.ReadKeys[0]].(*Set)
}
defer params.KeyRUnlock(params.Context, keys.ReadKeys[0])
baseSet, ok := params.GetValue(params.Context, keys.ReadKeys[0]).(*Set)
if !ok { if !ok {
return nil, fmt.Errorf("value at key %s is not a set", keys.ReadKeys[0]) return nil, fmt.Errorf("value at key %s is not a set", keys.ReadKeys[0])
} }
locks := make(map[string]bool)
defer func() {
for key, locked := range locks {
if locked {
params.KeyRUnlock(params.Context, key)
}
}
}()
for _, key := range keys.ReadKeys[1:] {
if !params.KeyExists(params.Context, key) {
continue
}
if _, err = params.KeyRLock(params.Context, key); err != nil {
continue
}
locks[key] = true
}
var sets []*Set var sets []*Set
for _, key := range params.Command[2:] { for _, key := range params.Command[2:] {
set, ok := params.GetValue(params.Context, key).(*Set) set, ok := params.GetValues(params.Context, []string{key})[key].(*Set)
if !ok { if !ok {
continue continue
} }
@@ -155,42 +123,21 @@ func handleSDIFFSTORE(params internal.HandlerFuncParams) ([]byte, error) {
} }
destination := keys.WriteKeys[0] destination := keys.WriteKeys[0]
keyExists := params.KeysExist(append(keys.WriteKeys, keys.ReadKeys...))
// Extract base set first // Extract base set first
if !params.KeyExists(params.Context, keys.ReadKeys[0]) { if !keyExists[keys.ReadKeys[0]] {
return nil, fmt.Errorf("key for base set \"%s\" does not exist", keys.ReadKeys[0]) return nil, fmt.Errorf("key for base set \"%s\" does not exist", keys.ReadKeys[0])
} }
if _, err := params.KeyRLock(params.Context, keys.ReadKeys[0]); err != nil {
return nil, err baseSet, ok := params.GetValues(params.Context, []string{keys.ReadKeys[0]})[keys.ReadKeys[0]].(*Set)
}
defer params.KeyRUnlock(params.Context, keys.ReadKeys[0])
baseSet, ok := params.GetValue(params.Context, keys.ReadKeys[0]).(*Set)
if !ok { if !ok {
return nil, fmt.Errorf("value at key %s is not a set", keys.ReadKeys[0]) return nil, fmt.Errorf("value at key %s is not a set", keys.ReadKeys[0])
} }
locks := make(map[string]bool)
defer func() {
for key, locked := range locks {
if locked {
params.KeyRUnlock(params.Context, key)
}
}
}()
for _, key := range keys.ReadKeys[1:] {
if !params.KeyExists(params.Context, key) {
continue
}
if _, err = params.KeyRLock(params.Context, key); err != nil {
continue
}
locks[key] = true
}
var sets []*Set var sets []*Set
for _, key := range keys.ReadKeys[1:] { for _, key := range keys.ReadKeys[1:] {
set, ok := params.GetValue(params.Context, key).(*Set) set, ok := params.GetValues(params.Context, []string{key})[key].(*Set)
if !ok { if !ok {
continue continue
} }
@@ -202,24 +149,9 @@ func handleSDIFFSTORE(params internal.HandlerFuncParams) ([]byte, error) {
res := fmt.Sprintf(":%d\r\n", len(elems)) res := fmt.Sprintf(":%d\r\n", len(elems))
if params.KeyExists(params.Context, destination) { if err = params.SetValues(params.Context, map[string]interface{}{destination: diff}); err != nil {
if _, err = params.KeyLock(params.Context, destination); err != nil {
return nil, err return nil, err
} }
if err = params.SetValue(params.Context, destination, diff); err != nil {
return nil, err
}
params.KeyUnlock(params.Context, destination)
return []byte(res), nil
}
if _, err = params.CreateKeyAndLock(params.Context, destination); err != nil {
return nil, err
}
if err = params.SetValue(params.Context, destination, diff); err != nil {
return nil, err
}
params.KeyUnlock(params.Context, destination)
return []byte(res), nil return []byte(res), nil
} }
@@ -230,30 +162,15 @@ func handleSINTER(params internal.HandlerFuncParams) ([]byte, error) {
return nil, err return nil, err
} }
locks := make(map[string]bool) keyExists := params.KeysExist(keys.ReadKeys)
defer func() {
for key, locked := range locks {
if locked {
params.KeyRUnlock(params.Context, key)
}
}
}()
for _, key := range keys.ReadKeys {
if !params.KeyExists(params.Context, key) {
// If key does not exist, then there is no intersection
return []byte("*0\r\n"), nil
}
if _, err = params.KeyRLock(params.Context, key); err != nil {
return nil, err
}
locks[key] = true
}
var sets []*Set var sets []*Set
for key, _ := range locks { for key, exists := range keyExists {
set, ok := params.GetValue(params.Context, key).(*Set) if !exists {
return []byte("*0\r\n"), nil
}
set, ok := params.GetValues(params.Context, []string{key})[key].(*Set)
if !ok { if !ok {
// If the value at the key is not a set, return error // If the value at the key is not a set, return error
return nil, fmt.Errorf("value at key %s is not a set", key) return nil, fmt.Errorf("value at key %s is not a set", key)
@@ -285,6 +202,8 @@ func handleSINTERCARD(params internal.HandlerFuncParams) ([]byte, error) {
return nil, err return nil, err
} }
keyExists := params.KeysExist(keys.ReadKeys)
// Extract the limit from the command // Extract the limit from the command
var limit int var limit int
limitIdx := slices.IndexFunc(params.Command, func(s string) bool { limitIdx := slices.IndexFunc(params.Command, func(s string) bool {
@@ -306,30 +225,13 @@ func handleSINTERCARD(params internal.HandlerFuncParams) ([]byte, error) {
} }
} }
locks := make(map[string]bool)
defer func() {
for key, locked := range locks {
if locked {
params.KeyRUnlock(params.Context, key)
}
}
}()
for _, key := range keys.ReadKeys {
if !params.KeyExists(params.Context, key) {
// If key does not exist, then there is no intersection
return []byte(":0\r\n"), nil
}
if _, err = params.KeyRLock(params.Context, key); err != nil {
return nil, err
}
locks[key] = true
}
var sets []*Set var sets []*Set
for key, _ := range locks { for key, exists := range keyExists {
set, ok := params.GetValue(params.Context, key).(*Set) if !exists {
return []byte(":0\r\n"), nil
}
set, ok := params.GetValues(params.Context, []string{key})[key].(*Set)
if !ok { if !ok {
// If the value at the key is not a set, return error // If the value at the key is not a set, return error
return nil, fmt.Errorf("value at key %s is not a set", key) return nil, fmt.Errorf("value at key %s is not a set", key)
@@ -352,30 +254,15 @@ func handleSINTERSTORE(params internal.HandlerFuncParams) ([]byte, error) {
return nil, err return nil, err
} }
locks := make(map[string]bool) keyExists := params.KeysExist(keys.ReadKeys)
defer func() {
for key, locked := range locks {
if locked {
params.KeyRUnlock(params.Context, key)
}
}
}()
for _, key := range keys.ReadKeys {
if !params.KeyExists(params.Context, key) {
// If key does not exist, then there is no intersection
return []byte(":0\r\n"), nil
}
if _, err = params.KeyRLock(params.Context, key); err != nil {
return nil, err
}
locks[key] = true
}
var sets []*Set var sets []*Set
for key, _ := range locks { for key, exists := range keyExists {
set, ok := params.GetValue(params.Context, key).(*Set) if !exists {
return []byte(":0\r\n"), err
}
set, ok := params.GetValues(params.Context, []string{key})[key].(*Set)
if !ok { if !ok {
// If the value at the key is not a set, return error // If the value at the key is not a set, return error
return nil, fmt.Errorf("value at key %s is not a set", key) return nil, fmt.Errorf("value at key %s is not a set", key)
@@ -386,20 +273,9 @@ func handleSINTERSTORE(params internal.HandlerFuncParams) ([]byte, error) {
intersect, _ := Intersection(0, sets...) intersect, _ := Intersection(0, sets...)
destination := keys.WriteKeys[0] destination := keys.WriteKeys[0]
if params.KeyExists(params.Context, destination) { if err = params.SetValues(params.Context, map[string]interface{}{destination: intersect}); err != nil {
if _, err = params.KeyLock(params.Context, destination); err != nil {
return nil, err return nil, err
} }
} else {
if _, err = params.CreateKeyAndLock(params.Context, destination); err != nil {
return nil, err
}
}
if err = params.SetValue(params.Context, destination, intersect); err != nil {
return nil, err
}
params.KeyUnlock(params.Context, destination)
return []byte(fmt.Sprintf(":%d\r\n", intersect.Cardinality())), nil return []byte(fmt.Sprintf(":%d\r\n", intersect.Cardinality())), nil
} }
@@ -411,17 +287,13 @@ func handleSISMEMBER(params internal.HandlerFuncParams) ([]byte, error) {
} }
key := keys.ReadKeys[0] key := keys.ReadKeys[0]
keyExists := params.KeysExist(keys.ReadKeys)[key]
if !params.KeyExists(params.Context, key) { if !keyExists {
return []byte(":0\r\n"), nil return []byte(":0\r\n"), nil
} }
if _, err = params.KeyRLock(params.Context, key); err != nil { set, ok := params.GetValues(params.Context, []string{key})[key].(*Set)
return nil, err
}
defer params.KeyRUnlock(params.Context, key)
set, ok := params.GetValue(params.Context, key).(*Set)
if !ok { if !ok {
return nil, fmt.Errorf("value at key %s is not a set", key) return nil, fmt.Errorf("value at key %s is not a set", key)
} }
@@ -440,17 +312,13 @@ func handleSMEMBERS(params internal.HandlerFuncParams) ([]byte, error) {
} }
key := keys.ReadKeys[0] key := keys.ReadKeys[0]
keyExists := params.KeysExist(keys.ReadKeys)[key]
if !params.KeyExists(params.Context, key) { if !keyExists {
return []byte("*0\r\n"), nil return []byte("*0\r\n"), nil
} }
if _, err = params.KeyRLock(params.Context, key); err != nil { set, ok := params.GetValues(params.Context, []string{key})[key].(*Set)
return nil, err
}
defer params.KeyRUnlock(params.Context, key)
set, ok := params.GetValue(params.Context, key).(*Set)
if !ok { if !ok {
return nil, fmt.Errorf("value at key %s is not a set", key) return nil, fmt.Errorf("value at key %s is not a set", key)
} }
@@ -475,9 +343,10 @@ func handleSMISMEMBER(params internal.HandlerFuncParams) ([]byte, error) {
} }
key := keys.ReadKeys[0] key := keys.ReadKeys[0]
keyExists := params.KeysExist(keys.ReadKeys)[key]
members := params.Command[2:] members := params.Command[2:]
if !params.KeyExists(params.Context, key) { if !keyExists {
res := fmt.Sprintf("*%d", len(members)) res := fmt.Sprintf("*%d", len(members))
for i, _ := range members { for i, _ := range members {
res = fmt.Sprintf("%s\r\n:0", res) res = fmt.Sprintf("%s\r\n:0", res)
@@ -488,12 +357,7 @@ func handleSMISMEMBER(params internal.HandlerFuncParams) ([]byte, error) {
return []byte(res), nil return []byte(res), nil
} }
if _, err = params.KeyRLock(params.Context, key); err != nil { set, ok := params.GetValues(params.Context, []string{key})[key].(*Set)
return nil, err
}
defer params.KeyRUnlock(params.Context, key)
set, ok := params.GetValue(params.Context, key).(*Set)
if !ok { if !ok {
return nil, fmt.Errorf("value at key %s is not a set", key) return nil, fmt.Errorf("value at key %s is not a set", key)
} }
@@ -518,46 +382,24 @@ func handleSMOVE(params internal.HandlerFuncParams) ([]byte, error) {
} }
source, destination := keys.WriteKeys[0], keys.WriteKeys[1] source, destination := keys.WriteKeys[0], keys.WriteKeys[1]
keyExists := params.KeysExist(keys.WriteKeys)
member := params.Command[3] member := params.Command[3]
if !params.KeyExists(params.Context, source) { if !keyExists[source] {
return []byte(":0\r\n"), nil return []byte(":0\r\n"), nil
} }
if _, err = params.KeyLock(params.Context, source); err != nil { sets := params.GetValues(params.Context, keys.WriteKeys)
return nil, err
}
defer params.KeyUnlock(params.Context, source)
sourceSet, ok := params.GetValue(params.Context, source).(*Set) sourceSet, ok := sets[source].(*Set)
if !ok { if !ok {
return nil, errors.New("source is not a set") return nil, errors.New("source is not a set")
} }
var destinationSet *Set destinationSet, ok := sets[destination].(*Set)
if !params.KeyExists(params.Context, destination) {
// Destination key does not exist
if _, err = params.CreateKeyAndLock(params.Context, destination); err != nil {
return nil, err
}
defer params.KeyUnlock(params.Context, destination)
destinationSet = NewSet([]string{})
if err = params.SetValue(params.Context, destination, destinationSet); err != nil {
return nil, err
}
} else {
// Destination key exists
if _, err := params.KeyLock(params.Context, destination); err != nil {
return nil, err
}
defer params.KeyUnlock(params.Context, destination)
ds, ok := params.GetValue(params.Context, destination).(*Set)
if !ok { if !ok {
return nil, errors.New("destination is not a set") return nil, errors.New("destination is not a set")
} }
destinationSet = ds
}
res := sourceSet.Move(destinationSet, member) res := sourceSet.Move(destinationSet, member)
@@ -571,6 +413,7 @@ func handleSPOP(params internal.HandlerFuncParams) ([]byte, error) {
} }
key := keys.WriteKeys[0] key := keys.WriteKeys[0]
keyExists := params.KeysExist(keys.WriteKeys)[key]
count := 1 count := 1
if len(params.Command) == 3 { if len(params.Command) == 3 {
@@ -581,16 +424,11 @@ func handleSPOP(params internal.HandlerFuncParams) ([]byte, error) {
count = c count = c
} }
if !params.KeyExists(params.Context, key) { if !keyExists {
return []byte("*-1\r\n"), nil return []byte("*-1\r\n"), nil
} }
if _, err = params.KeyLock(params.Context, key); err != nil { set, ok := params.GetValues(params.Context, []string{key})[key].(*Set)
return nil, err
}
defer params.KeyUnlock(params.Context, key)
set, ok := params.GetValue(params.Context, key).(*Set)
if !ok { if !ok {
return nil, fmt.Errorf("value at %s is not a set", key) return nil, fmt.Errorf("value at %s is not a set", key)
} }
@@ -615,6 +453,7 @@ func handleSRANDMEMBER(params internal.HandlerFuncParams) ([]byte, error) {
} }
key := keys.ReadKeys[0] key := keys.ReadKeys[0]
keyExists := params.KeysExist(keys.ReadKeys)[key]
count := 1 count := 1
if len(params.Command) == 3 { if len(params.Command) == 3 {
@@ -625,16 +464,11 @@ func handleSRANDMEMBER(params internal.HandlerFuncParams) ([]byte, error) {
count = c count = c
} }
if !params.KeyExists(params.Context, key) { if !keyExists {
return []byte("*-1\r\n"), nil return []byte("*-1\r\n"), nil
} }
if _, err = params.KeyLock(params.Context, key); err != nil { set, ok := params.GetValues(params.Context, []string{key})[key].(*Set)
return nil, err
}
defer params.KeyUnlock(params.Context, key)
set, ok := params.GetValue(params.Context, key).(*Set)
if !ok { if !ok {
return nil, fmt.Errorf("value at %s is not a set", key) return nil, fmt.Errorf("value at %s is not a set", key)
} }
@@ -659,18 +493,14 @@ func handleSREM(params internal.HandlerFuncParams) ([]byte, error) {
} }
key := keys.WriteKeys[0] key := keys.WriteKeys[0]
keyExists := params.KeysExist(keys.WriteKeys)[key]
members := params.Command[2:] members := params.Command[2:]
if !params.KeyExists(params.Context, key) { if !keyExists {
return []byte(":0\r\n"), nil return []byte(":0\r\n"), nil
} }
if _, err = params.KeyLock(params.Context, key); err != nil { set, ok := params.GetValues(params.Context, []string{key})[key].(*Set)
return nil, err
}
defer params.KeyUnlock(params.Context, key)
set, ok := params.GetValue(params.Context, key).(*Set)
if !ok { if !ok {
return nil, fmt.Errorf("value at key %s is not a set", key) return nil, fmt.Errorf("value at key %s is not a set", key)
} }
@@ -686,32 +516,11 @@ func handleSUNION(params internal.HandlerFuncParams) ([]byte, error) {
return nil, err return nil, err
} }
locks := make(map[string]bool)
defer func() {
for key, locked := range locks {
if locked {
params.KeyRUnlock(params.Context, key)
}
}
}()
for _, key := range keys.ReadKeys {
if !params.KeyExists(params.Context, key) {
continue
}
if _, err = params.KeyRLock(params.Context, key); err != nil {
return nil, err
}
locks[key] = true
}
var sets []*Set var sets []*Set
for key, locked := range locks { values := params.GetValues(params.Context, keys.ReadKeys)
if !locked { for key, value := range values {
continue set, ok := value.(*Set)
}
set, ok := params.GetValue(params.Context, key).(*Set)
if !ok { if !ok {
return nil, fmt.Errorf("value at key %s is not a set", key) return nil, fmt.Errorf("value at key %s is not a set", key)
} }
@@ -737,32 +546,13 @@ func handleSUNIONSTORE(params internal.HandlerFuncParams) ([]byte, error) {
return nil, err return nil, err
} }
locks := make(map[string]bool) destination := keys.WriteKeys[0]
defer func() {
for key, locked := range locks {
if locked {
params.KeyRUnlock(params.Context, key)
}
}
}()
for _, key := range keys.ReadKeys {
if !params.KeyExists(params.Context, key) {
continue
}
if _, err = params.KeyRLock(params.Context, key); err != nil {
return nil, err
}
locks[key] = true
}
var sets []*Set var sets []*Set
for key, locked := range locks { values := params.GetValues(params.Context, keys.ReadKeys)
if !locked { for key, value := range values {
continue set, ok := value.(*Set)
}
set, ok := params.GetValue(params.Context, key).(*Set)
if !ok { if !ok {
return nil, fmt.Errorf("value at key %s is not a set", key) return nil, fmt.Errorf("value at key %s is not a set", key)
} }
@@ -771,20 +561,7 @@ func handleSUNIONSTORE(params internal.HandlerFuncParams) ([]byte, error) {
union := Union(sets...) union := Union(sets...)
destination := keys.WriteKeys[0] if err = params.SetValues(params.Context, map[string]interface{}{destination: union}); err != nil {
if params.KeyExists(params.Context, destination) {
if _, err = params.KeyLock(params.Context, destination); err != nil {
return nil, err
}
} else {
if _, err = params.CreateKeyAndLock(params.Context, destination); err != nil {
return nil, err
}
}
defer params.KeyUnlock(params.Context, destination)
if err = params.SetValue(params.Context, destination, union); err != nil {
return nil, err return nil, err
} }
return []byte(fmt.Sprintf(":%d\r\n", union.Cardinality())), nil return []byte(fmt.Sprintf(":%d\r\n", union.Cardinality())), nil

File diff suppressed because it is too large Load Diff