diff --git a/database/sortedset.go b/database/sortedset.go index 3ee0e2d..c1012ae 100644 --- a/database/sortedset.go +++ b/database/sortedset.go @@ -514,6 +514,37 @@ func execZRemRangeByRank(db *DB, args [][]byte) redis.Reply { return protocol.MakeIntReply(removed) } +func execZPopMin(db *DB, args [][]byte) redis.Reply { + key := string(args[0]) + count := 1 + if len(args) > 1 { + var err error + count, err = strconv.Atoi(string(args[1])) + if err != nil { + return protocol.MakeErrReply("ERR value is not an integer or out of range") + } + } + + sortedSet, errReply := db.getAsSortedSet(key) + if errReply != nil { + return errReply + } + if sortedSet == nil { + return protocol.MakeEmptyMultiBulkReply() + } + + removed := sortedSet.PopMin(count) + if len(removed) > 0 { + db.addAof(utils.ToCmdLine3("zpopmin", args...)) + } + result := make([][]byte, 0, len(removed)*2) + for _, element := range removed { + scoreStr := strconv.FormatFloat(element.Score, 'f', -1, 64) + result = append(result, []byte(element.Member), []byte(scoreStr)) + } + return protocol.MakeMultiBulkReply(result) +} + // execZRem removes given members func execZRem(db *DB, args [][]byte) redis.Reply { // parse args @@ -602,6 +633,7 @@ func init() { RegisterCommand("ZRangeByScore", execZRangeByScore, readFirstKey, nil, -4, flagReadOnly) RegisterCommand("ZRevRange", execZRevRange, readFirstKey, nil, -4, flagReadOnly) RegisterCommand("ZRevRangeByScore", execZRevRangeByScore, readFirstKey, nil, -4, flagReadOnly) + RegisterCommand("ZPopMin", execZPopMin, writeFirstKey, rollbackFirstKey, -2, flagWrite) RegisterCommand("ZRem", execZRem, writeFirstKey, undoZRem, -3, flagWrite) RegisterCommand("ZRemRangeByScore", execZRemRangeByScore, writeFirstKey, rollbackFirstKey, 4, flagWrite) RegisterCommand("ZRemRangeByRank", execZRemRangeByRank, writeFirstKey, rollbackFirstKey, 4, flagWrite) diff --git a/database/sortedset_test.go b/database/sortedset_test.go index a6cad0f..f9385e6 100644 --- a/database/sortedset_test.go +++ b/database/sortedset_test.go @@ -303,3 +303,21 @@ func TestZIncrBy(t *testing.T) { result = testDB.Exec(nil, utils.ToCmdLine("ZScore", key, "a")) asserts.AssertBulkReply(t, result, "20") } + +func TestZPopMin(t *testing.T) { + testDB.Flush() + key := utils.RandString(10) + result := testDB.Exec(nil, utils.ToCmdLine("ZAdd", key, "1", "a", "1", "b", "2", "c")) + asserts.AssertNotError(t, result) + result = testDB.Exec(nil, utils.ToCmdLine("ZPopMin", key, "2")) + asserts.AssertMultiBulkReply(t, result, []string{"a", "1", "b", "1"}) + result = testDB.Exec(nil, utils.ToCmdLine("ZRange", key, "0", "-1")) + asserts.AssertMultiBulkReply(t, result, []string{"c"}) + + result = testDB.Exec(nil, utils.ToCmdLine("ZPopMin", key+"1", "2")) + asserts.AssertMultiBulkReplySize(t, result, 0) + + testDB.Exec(nil, utils.ToCmdLine("set", key+"2", "2")) + result = testDB.Exec(nil, utils.ToCmdLine("ZPopMin", key+"2", "2")) + asserts.AssertErrReply(t, result, "WRONGTYPE Operation against a key holding the wrong kind of value") +} diff --git a/datastruct/sortedset/skiplist.go b/datastruct/sortedset/skiplist.go index 73d2e5c..4890b51 100644 --- a/datastruct/sortedset/skiplist.go +++ b/datastruct/sortedset/skiplist.go @@ -281,7 +281,7 @@ func (skiplist *skiplist) getLastInScoreRange(min *ScoreBorder, max *ScoreBorder /* * return removed elements */ -func (skiplist *skiplist) RemoveRangeByScore(min *ScoreBorder, max *ScoreBorder) (removed []*Element) { +func (skiplist *skiplist) RemoveRangeByScore(min *ScoreBorder, max *ScoreBorder, limit int) (removed []*Element) { update := make([]*node, maxLevel) removed = make([]*Element, 0) // find backward nodes (of target range) or last node of each level @@ -308,6 +308,9 @@ func (skiplist *skiplist) RemoveRangeByScore(min *ScoreBorder, max *ScoreBorder) removedElement := node.Element removed = append(removed, &removedElement) skiplist.removeNode(node, update) + if limit > 0 && len(removed) == limit { + break + } node = next } return removed diff --git a/datastruct/sortedset/sortedset.go b/datastruct/sortedset/sortedset.go index 612cfd8..8d0f0ed 100644 --- a/datastruct/sortedset/sortedset.go +++ b/datastruct/sortedset/sortedset.go @@ -204,13 +204,29 @@ func (sortedSet *SortedSet) RangeByScore(min *ScoreBorder, max *ScoreBorder, off // RemoveByScore removes members which score within the given border func (sortedSet *SortedSet) RemoveByScore(min *ScoreBorder, max *ScoreBorder) int64 { - removed := sortedSet.skiplist.RemoveRangeByScore(min, max) + removed := sortedSet.skiplist.RemoveRangeByScore(min, max, 0) for _, element := range removed { delete(sortedSet.dict, element.Member) } return int64(len(removed)) } +func (sortedSet *SortedSet) PopMin(count int) []*Element { + first := sortedSet.skiplist.getFirstInScoreRange(negativeInfBorder, positiveInfBorder) + if first == nil { + return nil + } + border := &ScoreBorder{ + Value: first.Score, + Exclude: false, + } + removed := sortedSet.skiplist.RemoveRangeByScore(border, border, count) + for _, element := range removed { + delete(sortedSet.dict, element.Member) + } + return removed +} + // RemoveByRank removes member ranking within [start, stop) // sort by ascending order and rank starts from 0 func (sortedSet *SortedSet) RemoveByRank(start int64, stop int64) int64 {