Implemented divide & conquer function for calculating the union of multiple sets. Implemented unit test for ZUNION command handler

This commit is contained in:
Kelvin Clement Mwinuka
2024-02-21 01:04:34 +08:00
parent 0e657baa2e
commit ed735b9dbd
4 changed files with 475 additions and 37 deletions

View File

@@ -1419,8 +1419,8 @@ func handleZRANGESTORE(ctx context.Context, cmd []string, server utils.Server, c
}
func handleZUNION(ctx context.Context, cmd []string, server utils.Server, conn *net.Conn) ([]byte, error) {
if len(cmd) < 2 {
return nil, errors.New(utils.WRONG_ARGS_RESPONSE)
if _, err := zunionKeyFunc(cmd); err != nil {
return nil, err
}
keys, weights, aggregate, withscores, err := extractKeysWeightsAggregateWithScores(cmd)
@@ -1437,48 +1437,38 @@ func handleZUNION(ctx context.Context, cmd []string, server utils.Server, conn *
}
}()
var sets []*SortedSet
var setParams []SortedSetParam
for _, key := range keys {
if server.KeyExists(key) {
_, err := server.KeyRLock(ctx, key)
if err != nil {
for i := 0; i < len(keys); i++ {
if server.KeyExists(keys[i]) {
if _, err = server.KeyRLock(ctx, keys[i]); err != nil {
return nil, err
}
locks[key] = true
set, ok := server.GetValue(key).(*SortedSet)
locks[keys[i]] = true
set, ok := server.GetValue(keys[i]).(*SortedSet)
if !ok {
return nil, fmt.Errorf("value at key %s is not a sorted set", key)
return nil, fmt.Errorf("value at %s is not a sorted set", keys[i])
}
sets = append(sets, set)
setParams = append(setParams, SortedSetParam{
set: set,
weight: weights[i],
})
}
}
var union *SortedSet
if len(sets) > 1 {
union, err = sets[0].Union(sets[1:], weights, aggregate)
if err != nil {
return nil, err
}
} else if len(sets) == 1 {
union = sets[0]
} else {
return nil, errors.New("no sorted sets to form union")
}
union := Union(aggregate, setParams...)
res := fmt.Sprintf("*%d", union.Cardinality())
for i, m := range union.GetAll() {
for _, m := range union.GetAll() {
if withscores {
s := fmt.Sprintf("%s %f", m.value, m.score)
s := fmt.Sprintf("%s %s", m.value, strconv.FormatFloat(float64(m.score), 'f', -1, 64))
res += fmt.Sprintf("\r\n$%d\r\n%s", len(s), s)
} else {
res += fmt.Sprintf("\r\n+%s", m.value)
}
if i == union.Cardinality()-1 {
}
res += "\r\n\r\n"
}
}
return []byte(res), nil
}

View File

@@ -1617,6 +1617,327 @@ func Test_HandleZRANGE(t *testing.T) {}
func Test_HandleZRANGESTORE(t *testing.T) {}
func Test_HandleZUNION(t *testing.T) {}
func Test_HandleZUNION(t *testing.T) {
mockServer := server.NewServer(server.Opts{})
tests := []struct {
preset bool
presetValues map[string]interface{}
command []string
expectedResponse []string
expectedError error
}{
{ // 1. Get the union between 2 sorted sets.
preset: true,
presetValues: map[string]interface{}{
"key1": NewSortedSet([]MemberParam{
{value: "one", score: 1}, {value: "two", score: 2},
{value: "three", score: 3}, {value: "four", score: 4},
{value: "five", score: 5},
}),
"key2": NewSortedSet([]MemberParam{
{value: "three", score: 3}, {value: "four", score: 4},
{value: "five", score: 5}, {value: "six", score: 6},
{value: "seven", score: 7}, {value: "eight", score: 8},
}),
},
command: []string{"ZUNION", "key1", "key2"},
expectedResponse: []string{"one", "two", "three", "four", "five", "six", "seven", "eight"},
expectedError: nil,
},
{
// 2. Get the union between 3 sorted sets with scores.
// By default, the SUM aggregate will be used.
preset: true,
presetValues: map[string]interface{}{
"key3": NewSortedSet([]MemberParam{
{value: "one", score: 1}, {value: "two", score: 2},
{value: "three", score: 3}, {value: "four", score: 4},
{value: "five", score: 5}, {value: "six", score: 6},
{value: "seven", score: 7}, {value: "eight", score: 8},
}),
"key4": NewSortedSet([]MemberParam{
{value: "one", score: 1}, {value: "two", score: 2},
{value: "thirty-six", score: 36}, {value: "twelve", score: 12},
{value: "eleven", score: 11}, {value: "eight", score: 8},
}),
"key5": NewSortedSet([]MemberParam{
{value: "one", score: 1}, {value: "eight", score: 8},
{value: "nine", score: 9}, {value: "ten", score: 10},
{value: "twelve", score: 12}, {value: "thirty-six", score: 36},
}),
},
command: []string{"ZUNION", "key3", "key4", "key5", "WITHSCORES"},
expectedResponse: []string{
"one 3", "two 4", "three 3", "four 4", "five 5", "six 6",
"seven 7", "eight 24", "nine 9", "ten 10", "eleven 11", "twelve 24", "thirty-six 72"},
expectedError: nil,
},
{
// 3. Get the union between 3 sorted sets with scores.
// Use MIN aggregate.
preset: true,
presetValues: map[string]interface{}{
"key6": NewSortedSet([]MemberParam{
{value: "one", score: 100}, {value: "two", score: 2},
{value: "three", score: 3}, {value: "four", score: 4},
{value: "five", score: 5}, {value: "six", score: 6},
{value: "seven", score: 7}, {value: "eight", score: 8},
}),
"key7": NewSortedSet([]MemberParam{
{value: "one", score: 1}, {value: "two", score: 2},
{value: "thirty-six", score: 36}, {value: "twelve", score: 12},
{value: "eleven", score: 11}, {value: "eight", score: 80},
}),
"key8": NewSortedSet([]MemberParam{
{value: "one", score: 1000}, {value: "eight", score: 800},
{value: "nine", score: 9}, {value: "ten", score: 10},
{value: "twelve", score: 12}, {value: "thirty-six", score: 72},
}),
},
command: []string{"ZUNION", "key6", "key7", "key8", "WITHSCORES", "AGGREGATE", "MIN"},
expectedResponse: []string{
"one 1", "two 2", "three 3", "four 4", "five 5", "six 6", "seven 7", "eight 8",
"nine 9", "ten 10", "eleven 11", "twelve 12", "thirty-six 36",
},
expectedError: nil,
},
{
// 4. Get the union between 3 sorted sets with scores.
// Use MAX aggregate.
preset: true,
presetValues: map[string]interface{}{
"key9": NewSortedSet([]MemberParam{
{value: "one", score: 100}, {value: "two", score: 2},
{value: "three", score: 3}, {value: "four", score: 4},
{value: "five", score: 5}, {value: "six", score: 6},
{value: "seven", score: 7}, {value: "eight", score: 8},
}),
"key10": NewSortedSet([]MemberParam{
{value: "one", score: 1}, {value: "two", score: 2},
{value: "thirty-six", score: 36}, {value: "twelve", score: 12},
{value: "eleven", score: 11}, {value: "eight", score: 80},
}),
"key11": NewSortedSet([]MemberParam{
{value: "one", score: 1000}, {value: "eight", score: 800},
{value: "nine", score: 9}, {value: "ten", score: 10},
{value: "twelve", score: 12}, {value: "thirty-six", score: 72},
}),
},
command: []string{"ZUNION", "key9", "key10", "key11", "WITHSCORES", "AGGREGATE", "MAX"},
expectedResponse: []string{
"one 1000", "two 2", "three 3", "four 4", "five 5", "six 6", "seven 7", "eight 800",
"nine 9", "ten 10", "eleven 11", "twelve 12", "thirty-six 72",
},
expectedError: nil,
},
{
// 5. Get the union between 3 sorted sets with scores.
// Use SUM aggregate with weights modifier.
preset: true,
presetValues: map[string]interface{}{
"key12": NewSortedSet([]MemberParam{
{value: "one", score: 100}, {value: "two", score: 2},
{value: "three", score: 3}, {value: "four", score: 4},
{value: "five", score: 5}, {value: "six", score: 6},
{value: "seven", score: 7}, {value: "eight", score: 8},
}),
"key13": NewSortedSet([]MemberParam{
{value: "one", score: 1}, {value: "two", score: 2},
{value: "thirty-six", score: 36}, {value: "twelve", score: 12},
{value: "eleven", score: 11}, {value: "eight", score: 80},
}),
"key14": NewSortedSet([]MemberParam{
{value: "one", score: 1000}, {value: "eight", score: 800},
{value: "nine", score: 9}, {value: "ten", score: 10},
{value: "twelve", score: 12},
}),
},
command: []string{"ZUNION", "key12", "key13", "key14", "WITHSCORES", "AGGREGATE", "SUM", "WEIGHTS", "1", "2", "3"},
expectedResponse: []string{
"one 3102", "two 6", "three 3", "four 4", "five 5", "six 6", "seven 7", "eight 2568",
"nine 27", "ten 30", "eleven 22", "twelve 60", "thirty-six 72",
},
expectedError: nil,
},
{
// 6. Get the union between 3 sorted sets with scores.
// Use MAX aggregate with added weights.
preset: true,
presetValues: map[string]interface{}{
"key15": NewSortedSet([]MemberParam{
{value: "one", score: 100}, {value: "two", score: 2},
{value: "three", score: 3}, {value: "four", score: 4},
{value: "five", score: 5}, {value: "six", score: 6},
{value: "seven", score: 7}, {value: "eight", score: 8},
}),
"key16": NewSortedSet([]MemberParam{
{value: "one", score: 1}, {value: "two", score: 2},
{value: "thirty-six", score: 36}, {value: "twelve", score: 12},
{value: "eleven", score: 11}, {value: "eight", score: 80},
}),
"key17": NewSortedSet([]MemberParam{
{value: "one", score: 1000}, {value: "eight", score: 800},
{value: "nine", score: 9}, {value: "ten", score: 10},
{value: "twelve", score: 12},
}),
},
command: []string{"ZUNION", "key15", "key16", "key17", "WITHSCORES", "AGGREGATE", "MAX", "WEIGHTS", "1", "2", "3"},
expectedResponse: []string{
"one 3000", "two 4", "three 3", "four 4", "five 5", "six 6", "seven 7", "eight 2400",
"nine 27", "ten 30", "eleven 22", "twelve 36", "thirty-six 72",
},
expectedError: nil,
},
{
// 7. Get the union between 3 sorted sets with scores.
// Use MIN aggregate with added weights.
preset: true,
presetValues: map[string]interface{}{
"key18": NewSortedSet([]MemberParam{
{value: "one", score: 100}, {value: "two", score: 2},
{value: "three", score: 3}, {value: "four", score: 4},
{value: "five", score: 5}, {value: "six", score: 6},
{value: "seven", score: 7}, {value: "eight", score: 8},
}),
"key19": NewSortedSet([]MemberParam{
{value: "one", score: 1}, {value: "two", score: 2},
{value: "thirty-six", score: 36}, {value: "twelve", score: 12},
{value: "eleven", score: 11}, {value: "eight", score: 80},
}),
"key20": NewSortedSet([]MemberParam{
{value: "one", score: 1000}, {value: "eight", score: 800},
{value: "nine", score: 9}, {value: "ten", score: 10},
{value: "twelve", score: 12},
}),
},
command: []string{"ZUNION", "key18", "key19", "key20", "WITHSCORES", "AGGREGATE", "MIN", "WEIGHTS", "1", "2", "3"},
expectedResponse: []string{
"one 2", "two 2", "three 3", "four 4", "five 5", "six 6", "seven 7", "eight 8",
"nine 27", "ten 30", "eleven 22", "twelve 24", "thirty-six 72",
},
expectedError: nil,
},
{ // 8. Throw an error if there are more weights than keys
preset: true,
presetValues: map[string]interface{}{
"key21": NewSortedSet([]MemberParam{
{value: "one", score: 1}, {value: "two", score: 2},
{value: "three", score: 3}, {value: "four", score: 4},
{value: "five", score: 5}, {value: "six", score: 6},
{value: "seven", score: 7}, {value: "eight", score: 8},
}),
"key22": NewSortedSet([]MemberParam{{value: "one", score: 1}}),
},
command: []string{"ZUNION", "key21", "key22", "WEIGHTS", "1", "2", "3"},
expectedResponse: nil,
expectedError: errors.New("number of weights should match number of keys"),
},
{ // 9. Throw an error if there are fewer weights than keys
preset: true,
presetValues: map[string]interface{}{
"key23": NewSortedSet([]MemberParam{
{value: "one", score: 1}, {value: "two", score: 2},
{value: "three", score: 3}, {value: "four", score: 4},
{value: "five", score: 5}, {value: "six", score: 6},
{value: "seven", score: 7}, {value: "eight", score: 8},
}),
"key24": NewSortedSet([]MemberParam{
{value: "one", score: 1}, {value: "two", score: 2},
}),
"key25": NewSortedSet([]MemberParam{{value: "one", score: 1}}),
},
command: []string{"ZUNION", "key23", "key24", "key25", "WEIGHTS", "5", "4"},
expectedResponse: nil,
expectedError: errors.New("number of weights should match number of keys"),
},
{ // 10. Throw an error if there are no keys provided
preset: true,
presetValues: map[string]interface{}{
"key26": NewSortedSet([]MemberParam{{value: "one", score: 1}}),
"key27": NewSortedSet([]MemberParam{{value: "one", score: 1}}),
"key28": NewSortedSet([]MemberParam{{value: "one", score: 1}}),
},
command: []string{"ZUNION", "WEIGHTS", "5", "4"},
expectedResponse: nil,
expectedError: errors.New(utils.WRONG_ARGS_RESPONSE),
},
{ // 11. Throw an error if any of the provided keys are not sorted sets
preset: true,
presetValues: map[string]interface{}{
"key29": NewSortedSet([]MemberParam{
{value: "one", score: 1}, {value: "two", score: 2},
{value: "three", score: 3}, {value: "four", score: 4},
{value: "five", score: 5}, {value: "six", score: 6},
{value: "seven", score: 7}, {value: "eight", score: 8},
}),
"key30": "Default value",
"key31": NewSortedSet([]MemberParam{{value: "one", score: 1}}),
},
command: []string{"ZUNION", "key29", "key30", "key31"},
expectedResponse: nil,
expectedError: errors.New("value at key30 is not a sorted set"),
},
{ // 12. If any of the keys does not exist, skip it.
preset: true,
presetValues: map[string]interface{}{
"key32": NewSortedSet([]MemberParam{
{value: "one", score: 1}, {value: "two", score: 2},
{value: "thirty-six", score: 36}, {value: "twelve", score: 12},
{value: "eleven", score: 11},
}),
"key33": NewSortedSet([]MemberParam{
{value: "seven", score: 7}, {value: "eight", score: 8},
{value: "nine", score: 9}, {value: "ten", score: 10},
{value: "twelve", score: 12},
}),
},
command: []string{"ZUNION", "non-existent", "key32", "key33"},
expectedResponse: []string{
"one", "two", "thirty-six", "twelve", "eleven",
"seven", "eight", "nine", "ten",
},
expectedError: nil,
},
{ // 13. Command too short
preset: false,
command: []string{"ZUNION"},
expectedResponse: []string{},
expectedError: errors.New(utils.WRONG_ARGS_RESPONSE),
},
}
for _, test := range tests {
if test.preset {
for key, value := range test.presetValues {
if _, err := mockServer.CreateKeyAndLock(context.Background(), key); err != nil {
t.Error(err)
}
mockServer.SetValue(context.Background(), key, value)
mockServer.KeyUnlock(key)
}
}
res, err := handleZUNION(context.Background(), test.command, mockServer, nil)
if test.expectedError != nil {
if err.Error() != test.expectedError.Error() {
t.Errorf("expected error \"%s\", got \"%s\"", test.expectedError.Error(), err.Error())
}
continue
}
if err != nil {
t.Error(err)
}
rd := resp.NewReader(bytes.NewBuffer(res))
rv, _, err := rd.ReadValue()
if err != nil {
t.Error(err)
}
for _, responseElement := range rv.Array() {
if !slices.Contains(test.expectedResponse, responseElement.String()) {
t.Errorf("could not find response element \"%s\" from expected response array", responseElement.String())
}
}
}
}
func Test_HandleZUNIONSTORE(t *testing.T) {}

View File

@@ -208,17 +208,43 @@ func zrangeStoreKeyFunc(cmd []string) ([]string, error) {
}
func zunionKeyFunc(cmd []string) ([]string, error) {
keys, _, _, _, err := extractKeysWeightsAggregateWithScores(cmd)
if err != nil {
return nil, err
if len(cmd) < 2 {
return nil, errors.New(utils.WRONG_ARGS_RESPONSE)
}
return keys, nil
endIdx := slices.IndexFunc(cmd[1:], func(s string) bool {
if strings.EqualFold(s, "WEIGHTS") ||
strings.EqualFold(s, "AGGREGATE") ||
strings.EqualFold(s, "WITHSCORES") {
return true
}
return false
})
if endIdx == -1 {
return cmd[1:], nil
}
if endIdx >= 1 {
return cmd[1:endIdx], nil
}
return nil, errors.New(utils.WRONG_ARGS_RESPONSE)
}
func zunionstoreKeyFunc(cmd []string) ([]string, error) {
keys, _, _, _, err := extractKeysWeightsAggregateWithScores(cmd)
if err != nil {
return nil, err
if len(cmd) < 2 {
return nil, errors.New(utils.WRONG_ARGS_RESPONSE)
}
return keys, nil
endIdx := slices.IndexFunc(cmd[1:], func(s string) bool {
if strings.EqualFold(s, "WEIGHTS") ||
strings.EqualFold(s, "AGGREGATE") ||
strings.EqualFold(s, "WITHSCORES") {
return true
}
return false
})
if endIdx == -1 {
return cmd[1:], nil
}
if endIdx >= 1 {
return cmd[1:endIdx], nil
}
return nil, errors.New(utils.WRONG_ARGS_RESPONSE)
}

View File

@@ -300,6 +300,107 @@ type SortedSetParam struct {
weight int
}
// Union uses divided & conquer to calculate the union of multiple sets
func Union(aggregate string, setParams ...SortedSetParam) *SortedSet {
switch len(setParams) {
case 0:
return NewSortedSet([]MemberParam{})
case 1:
var params []MemberParam
for _, member := range setParams[0].set.GetAll() {
params = append(params, MemberParam{
value: member.value,
score: member.score * Score(setParams[0].weight),
})
}
return NewSortedSet(params)
case 2:
var params []MemberParam
// Traverse the params in the left sorted set
for _, member := range setParams[0].set.GetAll() {
// If the member does not exist in the other sorted set, add it to params along with the appropriate weight
if !setParams[1].set.Contains(member.value) {
params = append(params, MemberParam{
value: member.value,
score: member.score * Score(setParams[0].weight),
})
continue
}
// If the member exists, get both elements and apply the weight
param := MemberParam{
value: member.value,
score: func(left, right Score) Score {
// Choose which param to add to params depending on the aggregate
switch aggregate {
case "sum":
return left + right
case "min":
return compareScores(left, right, "lt")
default:
// Aggregate is "max"
return compareScores(left, right, "gt")
}
}(
member.score*Score(setParams[0].weight),
setParams[1].set.Get(member.value).score*Score(setParams[1].weight),
),
}
params = append(params, param)
}
// Traverse the params on the right sorted set and add all the elements that are not
// already contained in params with their respective weights applied.
for _, member := range setParams[1].set.GetAll() {
if !slices.ContainsFunc(params, func(param MemberParam) bool {
return param.value == member.value
}) {
params = append(params, MemberParam{
value: member.value,
score: member.score * Score(setParams[1].weight),
})
}
}
return NewSortedSet(params)
default:
// Divide the sets into 2 and return the unions
left := Union(aggregate, setParams[0:len(setParams)/2]...)
right := Union(aggregate, setParams[len(setParams)/2:]...)
var params []MemberParam
// Traverse left sub-set and add the union elements to params
for _, member := range left.GetAll() {
if !right.Contains(member.value) {
// If the right set does not contain the current element, just add it to params
params = append(params, member)
continue
}
params = append(params, MemberParam{
value: member.value,
score: func(left, right Score) Score {
switch aggregate {
case "sum":
return left + right
case "min":
return compareScores(left, right, "lt")
default:
// Aggregate is "max"
return compareScores(left, right, "gt")
}
}(member.score, right.Get(member.value).score),
})
}
// Traverse the right sub-set and add any remaining elements to params
for _, member := range right.GetAll() {
if !slices.ContainsFunc(params, func(param MemberParam) bool {
return param.value == member.value
}) {
params = append(params, member)
}
}
return NewSortedSet(params)
}
}
// Intersect uses divide & conquer to calculate the intersection of multiple sets
func Intersect(aggregate string, setParams ...SortedSetParam) *SortedSet {
switch len(setParams) {
case 0: