This commit is contained in:
finley
2022-08-21 05:30:00 +08:00
parent 9da335811f
commit dddf9a8f6b
4 changed files with 71 additions and 2 deletions

View File

@@ -514,6 +514,37 @@ func execZRemRangeByRank(db *DB, args [][]byte) redis.Reply {
return protocol.MakeIntReply(removed)
}
func execZPopMin(db *DB, args [][]byte) redis.Reply {
key := string(args[0])
count := 1
if len(args) > 1 {
var err error
count, err = strconv.Atoi(string(args[1]))
if err != nil {
return protocol.MakeErrReply("ERR value is not an integer or out of range")
}
}
sortedSet, errReply := db.getAsSortedSet(key)
if errReply != nil {
return errReply
}
if sortedSet == nil {
return protocol.MakeEmptyMultiBulkReply()
}
removed := sortedSet.PopMin(count)
if len(removed) > 0 {
db.addAof(utils.ToCmdLine3("zpopmin", args...))
}
result := make([][]byte, 0, len(removed)*2)
for _, element := range removed {
scoreStr := strconv.FormatFloat(element.Score, 'f', -1, 64)
result = append(result, []byte(element.Member), []byte(scoreStr))
}
return protocol.MakeMultiBulkReply(result)
}
// execZRem removes given members
func execZRem(db *DB, args [][]byte) redis.Reply {
// parse args
@@ -602,6 +633,7 @@ func init() {
RegisterCommand("ZRangeByScore", execZRangeByScore, readFirstKey, nil, -4, flagReadOnly)
RegisterCommand("ZRevRange", execZRevRange, readFirstKey, nil, -4, flagReadOnly)
RegisterCommand("ZRevRangeByScore", execZRevRangeByScore, readFirstKey, nil, -4, flagReadOnly)
RegisterCommand("ZPopMin", execZPopMin, writeFirstKey, rollbackFirstKey, -2, flagWrite)
RegisterCommand("ZRem", execZRem, writeFirstKey, undoZRem, -3, flagWrite)
RegisterCommand("ZRemRangeByScore", execZRemRangeByScore, writeFirstKey, rollbackFirstKey, 4, flagWrite)
RegisterCommand("ZRemRangeByRank", execZRemRangeByRank, writeFirstKey, rollbackFirstKey, 4, flagWrite)

View File

@@ -303,3 +303,21 @@ func TestZIncrBy(t *testing.T) {
result = testDB.Exec(nil, utils.ToCmdLine("ZScore", key, "a"))
asserts.AssertBulkReply(t, result, "20")
}
func TestZPopMin(t *testing.T) {
testDB.Flush()
key := utils.RandString(10)
result := testDB.Exec(nil, utils.ToCmdLine("ZAdd", key, "1", "a", "1", "b", "2", "c"))
asserts.AssertNotError(t, result)
result = testDB.Exec(nil, utils.ToCmdLine("ZPopMin", key, "2"))
asserts.AssertMultiBulkReply(t, result, []string{"a", "1", "b", "1"})
result = testDB.Exec(nil, utils.ToCmdLine("ZRange", key, "0", "-1"))
asserts.AssertMultiBulkReply(t, result, []string{"c"})
result = testDB.Exec(nil, utils.ToCmdLine("ZPopMin", key+"1", "2"))
asserts.AssertMultiBulkReplySize(t, result, 0)
testDB.Exec(nil, utils.ToCmdLine("set", key+"2", "2"))
result = testDB.Exec(nil, utils.ToCmdLine("ZPopMin", key+"2", "2"))
asserts.AssertErrReply(t, result, "WRONGTYPE Operation against a key holding the wrong kind of value")
}

View File

@@ -281,7 +281,7 @@ func (skiplist *skiplist) getLastInScoreRange(min *ScoreBorder, max *ScoreBorder
/*
* return removed elements
*/
func (skiplist *skiplist) RemoveRangeByScore(min *ScoreBorder, max *ScoreBorder) (removed []*Element) {
func (skiplist *skiplist) RemoveRangeByScore(min *ScoreBorder, max *ScoreBorder, limit int) (removed []*Element) {
update := make([]*node, maxLevel)
removed = make([]*Element, 0)
// find backward nodes (of target range) or last node of each level
@@ -308,6 +308,9 @@ func (skiplist *skiplist) RemoveRangeByScore(min *ScoreBorder, max *ScoreBorder)
removedElement := node.Element
removed = append(removed, &removedElement)
skiplist.removeNode(node, update)
if limit > 0 && len(removed) == limit {
break
}
node = next
}
return removed

View File

@@ -204,13 +204,29 @@ func (sortedSet *SortedSet) RangeByScore(min *ScoreBorder, max *ScoreBorder, off
// RemoveByScore removes members which score within the given border
func (sortedSet *SortedSet) RemoveByScore(min *ScoreBorder, max *ScoreBorder) int64 {
removed := sortedSet.skiplist.RemoveRangeByScore(min, max)
removed := sortedSet.skiplist.RemoveRangeByScore(min, max, 0)
for _, element := range removed {
delete(sortedSet.dict, element.Member)
}
return int64(len(removed))
}
func (sortedSet *SortedSet) PopMin(count int) []*Element {
first := sortedSet.skiplist.getFirstInScoreRange(negativeInfBorder, positiveInfBorder)
if first == nil {
return nil
}
border := &ScoreBorder{
Value: first.Score,
Exclude: false,
}
removed := sortedSet.skiplist.RemoveRangeByScore(border, border, count)
for _, element := range removed {
delete(sortedSet.dict, element.Member)
}
return removed
}
// RemoveByRank removes member ranking within [start, stop)
// sort by ascending order and rank starts from 0
func (sortedSet *SortedSet) RemoveByRank(start int64, stop int64) int64 {