// Commands from https://redis.io/commands#stream package miniredis import ( "errors" "fmt" "sort" "strconv" "strings" "time" "github.com/alicebob/miniredis/v2/server" ) // commandsStream handles all stream operations. func commandsStream(m *Miniredis) { m.srv.Register("XADD", m.cmdXadd) m.srv.Register("XLEN", m.cmdXlen) m.srv.Register("XREAD", m.cmdXread) m.srv.Register("XRANGE", m.makeCmdXrange(false)) m.srv.Register("XREVRANGE", m.makeCmdXrange(true)) m.srv.Register("XGROUP", m.cmdXgroup) m.srv.Register("XINFO", m.cmdXinfo) m.srv.Register("XREADGROUP", m.cmdXreadgroup) m.srv.Register("XACK", m.cmdXack) m.srv.Register("XDEL", m.cmdXdel) m.srv.Register("XPENDING", m.cmdXpending) m.srv.Register("XTRIM", m.cmdXtrim) m.srv.Register("XAUTOCLAIM", m.cmdXautoclaim) m.srv.Register("XCLAIM", m.cmdXclaim) } // XADD func (m *Miniredis) cmdXadd(c *server.Peer, cmd string, args []string) { if len(args) < 4 { setDirty(c) c.WriteError(errWrongNumber(cmd)) return } if !m.handleAuth(c) { return } if m.checkPubsub(c, cmd) { return } key, args := args[0], args[1:] withTx(m, c, func(c *server.Peer, ctx *connCtx) { maxlen := -1 if strings.ToLower(args[0]) == "maxlen" { args = args[1:] // we don't treat "~" special if args[0] == "~" { args = args[1:] } n, err := strconv.Atoi(args[0]) if err != nil { c.WriteError(msgInvalidInt) return } if n < 0 { c.WriteError("ERR The MAXLEN argument must be >= 0.") return } maxlen = n args = args[1:] } if len(args) < 1 { c.WriteError(errWrongNumber(cmd)) return } entryID, args := args[0], args[1:] // args must be composed of field/value pairs. if len(args) == 0 || len(args)%2 != 0 { c.WriteError("ERR wrong number of arguments for XADD") // non-default message return } var values []string for len(args) > 0 { values = append(values, args[0], args[1]) args = args[2:] } db := m.db(ctx.selectedDB) s, err := db.stream(key) if err != nil { c.WriteError(err.Error()) return } if s == nil { // TODO: NOMKSTREAM s, _ = db.newStream(key) } newID, err := s.add(entryID, values, m.effectiveNow()) if err != nil { switch err { case errInvalidEntryID: c.WriteError(msgInvalidStreamID) default: c.WriteError(err.Error()) } return } if maxlen >= 0 { s.trim(maxlen) } db.keyVersion[key]++ c.WriteBulk(newID) }) } // XLEN func (m *Miniredis) cmdXlen(c *server.Peer, cmd string, args []string) { if len(args) != 1 { setDirty(c) c.WriteError(errWrongNumber(cmd)) return } if !m.handleAuth(c) { return } if m.checkPubsub(c, cmd) { return } key := args[0] withTx(m, c, func(c *server.Peer, ctx *connCtx) { db := m.db(ctx.selectedDB) s, err := db.stream(key) if err != nil { c.WriteError(err.Error()) } if s == nil { // No such key. That's zero length. c.WriteInt(0) return } c.WriteInt(len(s.entries)) }) } // XRANGE and XREVRANGE func (m *Miniredis) makeCmdXrange(reverse bool) server.Cmd { return func(c *server.Peer, cmd string, args []string) { if len(args) < 3 { setDirty(c) c.WriteError(errWrongNumber(cmd)) return } if len(args) == 4 || len(args) > 5 { setDirty(c) c.WriteError(msgSyntaxError) return } if !m.handleAuth(c) { return } if m.checkPubsub(c, cmd) { return } opts := struct { key string startKey string startExclusive bool endKey string endExclusive bool }{ key: args[0], startKey: args[1], endKey: args[2], } if strings.HasPrefix(opts.startKey, "(") { opts.startExclusive = true opts.startKey = opts.startKey[1:] if opts.startKey == "-" || opts.startKey == "+" { setDirty(c) c.WriteError(msgInvalidStreamID) return } } if strings.HasPrefix(opts.endKey, "(") { opts.endExclusive = true opts.endKey = opts.endKey[1:] if opts.endKey == "-" || opts.endKey == "+" { setDirty(c) c.WriteError(msgInvalidStreamID) return } } countArg := "0" if len(args) == 5 { if strings.ToLower(args[3]) != "count" { setDirty(c) c.WriteError(msgSyntaxError) return } countArg = args[4] } withTx(m, c, func(c *server.Peer, ctx *connCtx) { start, err := formatStreamRangeBound(opts.startKey, true, reverse) if err != nil { c.WriteError(msgInvalidStreamID) return } end, err := formatStreamRangeBound(opts.endKey, false, reverse) if err != nil { c.WriteError(msgInvalidStreamID) return } count, err := strconv.Atoi(countArg) if err != nil { c.WriteError(msgInvalidInt) return } db := m.db(ctx.selectedDB) if !db.exists(opts.key) { c.WriteLen(0) return } if db.t(opts.key) != "stream" { c.WriteError(ErrWrongType.Error()) return } var entries = db.streamKeys[opts.key].entries if reverse { entries = reversedStreamEntries(entries) } if count == 0 { count = len(entries) } var returnedEntries []StreamEntry for _, entry := range entries { if len(returnedEntries) == count { break } if !reverse { // Break if entry ID > end if streamCmp(entry.ID, end) == 1 { break } // Continue if entry ID < start if streamCmp(entry.ID, start) == -1 { continue } } else { // Break if entry iD < end if streamCmp(entry.ID, end) == -1 { break } // Continue if entry ID > start. if streamCmp(entry.ID, start) == 1 { continue } } // Continue if start exclusive and entry ID == start if opts.startExclusive && streamCmp(entry.ID, start) == 0 { continue } // Continue if end exclusive and entry ID == end if opts.endExclusive && streamCmp(entry.ID, end) == 0 { continue } returnedEntries = append(returnedEntries, entry) } c.WriteLen(len(returnedEntries)) for _, entry := range returnedEntries { c.WriteLen(2) c.WriteBulk(entry.ID) c.WriteLen(len(entry.Values)) for _, v := range entry.Values { c.WriteBulk(v) } } }) } } // XGROUP func (m *Miniredis) cmdXgroup(c *server.Peer, cmd string, args []string) { if len(args) == 0 { setDirty(c) c.WriteError(errWrongNumber(cmd)) return } subCmd, args := strings.ToUpper(args[0]), args[1:] switch subCmd { case "CREATE": m.cmdXgroupCreate(c, cmd, args) case "DESTROY": m.cmdXgroupDestroy(c, cmd, args) case "CREATECONSUMER": m.cmdXgroupCreateconsumer(c, cmd, args) case "DELCONSUMER": m.cmdXgroupDelconsumer(c, cmd, args) case "HELP", "SETID": err := fmt.Sprintf("ERR 'XGROUP %s' not supported", subCmd) setDirty(c) c.WriteError(err) default: setDirty(c) c.WriteError(fmt.Sprintf( "ERR Unknown subcommand or wrong number of arguments for '%s'. Try XGROUP HELP.", subCmd, )) } } // XGROUP CREATE func (m *Miniredis) cmdXgroupCreate(c *server.Peer, cmd string, args []string) { if len(args) != 3 && len(args) != 4 { setDirty(c) c.WriteError(errWrongNumber("CREATE")) return } stream, group, id := args[0], args[1], args[2] withTx(m, c, func(c *server.Peer, ctx *connCtx) { db := m.db(ctx.selectedDB) s, err := db.stream(stream) if err != nil { c.WriteError(err.Error()) return } if s == nil && len(args) == 4 && strings.ToUpper(args[3]) == "MKSTREAM" { if s, err = db.newStream(stream); err != nil { c.WriteError(err.Error()) return } } if s == nil { c.WriteError(msgXgroupKeyNotFound) return } if err := s.createGroup(group, id); err != nil { c.WriteError(err.Error()) return } c.WriteOK() }) } // XGROUP DESTROY func (m *Miniredis) cmdXgroupDestroy(c *server.Peer, cmd string, args []string) { if len(args) != 2 { setDirty(c) c.WriteError(errWrongNumber("DESTROY")) return } stream, groupName := args[0], args[1] withTx(m, c, func(c *server.Peer, ctx *connCtx) { db := m.db(ctx.selectedDB) s, err := db.stream(stream) if err != nil { c.WriteError(err.Error()) return } if s == nil { c.WriteError(msgXgroupKeyNotFound) return } if _, ok := s.groups[groupName]; !ok { c.WriteInt(0) return } delete(s.groups, groupName) c.WriteInt(1) }) } // XGROUP CREATECONSUMER func (m *Miniredis) cmdXgroupCreateconsumer(c *server.Peer, cmd string, args []string) { if len(args) != 3 { setDirty(c) c.WriteError(errWrongNumber("CREATECONSUMER")) return } key, groupName, consumerName := args[0], args[1], args[2] withTx(m, c, func(c *server.Peer, ctx *connCtx) { db := m.db(ctx.selectedDB) s, err := db.stream(key) if err != nil { c.WriteError(err.Error()) return } if s == nil { c.WriteError(msgXgroupKeyNotFound) return } g, ok := s.groups[groupName] if !ok { err := fmt.Sprintf("NOGROUP No such consumer group '%s' for key name '%s'", groupName, key) c.WriteError(err) return } if _, ok = g.consumers[consumerName]; ok { c.WriteInt(0) return } g.consumers[consumerName] = &consumer{} c.WriteInt(1) }) } // XGROUP DELCONSUMER func (m *Miniredis) cmdXgroupDelconsumer(c *server.Peer, cmd string, args []string) { if len(args) != 3 { setDirty(c) c.WriteError(errWrongNumber("DELCONSUMER")) return } key, groupName, consumerName := args[0], args[1], args[2] withTx(m, c, func(c *server.Peer, ctx *connCtx) { db := m.db(ctx.selectedDB) s, err := db.stream(key) if err != nil { c.WriteError(err.Error()) return } if s == nil { c.WriteError(msgXgroupKeyNotFound) return } g, ok := s.groups[groupName] if !ok { err := fmt.Sprintf("NOGROUP No such consumer group '%s' for key name '%s'", groupName, key) c.WriteError(err) return } consumer, ok := g.consumers[consumerName] if !ok { c.WriteInt(0) return } defer delete(g.consumers, consumerName) if consumer.numPendingEntries > 0 { newPending := make([]pendingEntry, 0) for _, entry := range g.pending { if entry.consumer != consumerName { newPending = append(newPending, entry) } } g.pending = newPending } c.WriteInt(consumer.numPendingEntries) }) } // XINFO func (m *Miniredis) cmdXinfo(c *server.Peer, cmd string, args []string) { if len(args) < 1 { setDirty(c) c.WriteError(errWrongNumber(cmd)) return } subCmd, args := strings.ToUpper(args[0]), args[1:] switch subCmd { case "STREAM": m.cmdXinfoStream(c, args) case "CONSUMERS": m.cmdXinfoConsumers(c, args) case "GROUPS": m.cmdXinfoGroups(c, args) case "HELP": err := fmt.Sprintf("'XINFO %s' not supported", strings.Join(args, " ")) setDirty(c) c.WriteError(err) default: setDirty(c) c.WriteError(fmt.Sprintf( "ERR Unknown subcommand or wrong number of arguments for '%s'. Try XINFO HELP.", subCmd, )) } } // XINFO STREAM // Produces only part of full command output func (m *Miniredis) cmdXinfoStream(c *server.Peer, args []string) { if len(args) < 1 { setDirty(c) c.WriteError(errWrongNumber("STREAM")) return } key := args[0] withTx(m, c, func(c *server.Peer, ctx *connCtx) { db := m.db(ctx.selectedDB) s, err := db.stream(key) if err != nil { c.WriteError(err.Error()) return } if s == nil { c.WriteError(msgKeyNotFound) return } c.WriteMapLen(1) c.WriteBulk("length") c.WriteInt(len(s.entries)) }) } // XINFO GROUPS func (m *Miniredis) cmdXinfoGroups(c *server.Peer, args []string) { if len(args) != 1 { setDirty(c) c.WriteError(errWrongNumber("GROUPS")) return } key := args[0] withTx(m, c, func(c *server.Peer, ctx *connCtx) { db := m.db(ctx.selectedDB) s, err := db.stream(key) if err != nil { c.WriteError(err.Error()) return } if s == nil { c.WriteError(msgKeyNotFound) return } c.WriteLen(len(s.groups)) for name, g := range s.groups { c.WriteMapLen(4) c.WriteBulk("name") c.WriteBulk(name) c.WriteBulk("consumers") c.WriteInt(len(g.consumers)) c.WriteBulk("pending") c.WriteInt(len(g.pending)) c.WriteBulk("last-delivered-id") c.WriteBulk(g.lastID) } }) } // XINFO CONSUMERS // Please note that this is only a partial implementation, for it does not // return each consumer's "idle" value, which indicates "the number of // milliseconds that have passed since the consumer last interacted with the // server." func (m *Miniredis) cmdXinfoConsumers(c *server.Peer, args []string) { if len(args) != 2 { setDirty(c) c.WriteError(errWrongNumber("CONSUMERS")) return } key := args[0] groupName := args[1] withTx(m, c, func(c *server.Peer, ctx *connCtx) { db := m.db(ctx.selectedDB) s, err := db.stream(key) if err != nil { c.WriteError(err.Error()) return } if s == nil { c.WriteError(msgKeyNotFound) return } g, ok := s.groups[groupName] if !ok { err := fmt.Sprintf("NOGROUP No such consumer group '%s' for key name '%s'", groupName, key) c.WriteError(err) return } consumerNames := make([]string, 0) for name := range g.consumers { consumerNames = append(consumerNames, name) } sort.Strings(consumerNames) c.WriteLen(len(consumerNames)) for _, name := range consumerNames { c.WriteMapLen(2) c.WriteBulk("name") c.WriteBulk(name) c.WriteBulk("pending") c.WriteInt(g.consumers[name].numPendingEntries) } }) } // XREADGROUP func (m *Miniredis) cmdXreadgroup(c *server.Peer, cmd string, args []string) { // XREADGROUP GROUP group consumer STREAMS key ID if len(args) < 6 { setDirty(c) c.WriteError(errWrongNumber(cmd)) return } var opts struct { group string consumer string count int noack bool streams []string ids []string block bool blockTimeout time.Duration } if strings.ToUpper(args[0]) != "GROUP" { setDirty(c) c.WriteError(msgSyntaxError) return } opts.group, opts.consumer, args = args[1], args[2], args[3:] var err error parsing: for len(args) > 0 { switch strings.ToUpper(args[0]) { case "COUNT": if len(args) < 2 { err = errors.New(errWrongNumber(cmd)) break parsing } opts.count, err = strconv.Atoi(args[1]) if err != nil { break parsing } args = args[2:] case "BLOCK": err = parseBlock(cmd, args, &opts.block, &opts.blockTimeout) if err != nil { break parsing } args = args[2:] case "NOACK": args = args[1:] opts.noack = true case "STREAMS": args = args[1:] if len(args)%2 != 0 { err = errors.New(msgXreadUnbalanced) break parsing } opts.streams, opts.ids = args[0:len(args)/2], args[len(args)/2:] break parsing default: err = fmt.Errorf("ERR incorrect argument %s", args[0]) break parsing } } if err != nil { setDirty(c) c.WriteError(err.Error()) return } if len(opts.streams) == 0 || len(opts.ids) == 0 { setDirty(c) c.WriteError(errWrongNumber(cmd)) return } for _, id := range opts.ids { if id != `>` { opts.block = false } } if !opts.block { withTx(m, c, func(c *server.Peer, ctx *connCtx) { db := m.db(ctx.selectedDB) res, err := xreadgroup( db, opts.group, opts.consumer, opts.noack, opts.streams, opts.ids, opts.count, m.effectiveNow(), ) if err != nil { c.WriteError(err.Error()) return } writeXread(c, opts.streams, res) }) return } blocking( m, c, opts.blockTimeout, func(c *server.Peer, ctx *connCtx) bool { db := m.db(ctx.selectedDB) res, err := xreadgroup( db, opts.group, opts.consumer, opts.noack, opts.streams, opts.ids, opts.count, m.effectiveNow(), ) if err != nil { c.WriteError(err.Error()) return true } if len(res) == 0 { return false } writeXread(c, opts.streams, res) return true }, func(c *server.Peer) { // timeout c.WriteLen(-1) }, ) } func xreadgroup( db *RedisDB, group, consumer string, noack bool, streams []string, ids []string, count int, now time.Time, ) (map[string][]StreamEntry, error) { res := map[string][]StreamEntry{} for i, key := range streams { id := ids[i] g, err := db.streamGroup(key, group) if err != nil { return nil, err } if g == nil { return nil, errXreadgroup(key, group) } if _, err := parseStreamID(id); id != `>` && err != nil { return nil, err } entries := g.readGroup(now, consumer, id, count, noack) if id == `>` && len(entries) == 0 { continue } res[key] = entries } return res, nil } // XACK func (m *Miniredis) cmdXack(c *server.Peer, cmd string, args []string) { if len(args) < 3 { setDirty(c) c.WriteError(errWrongNumber(cmd)) return } key, group, ids := args[0], args[1], args[2:] withTx(m, c, func(c *server.Peer, ctx *connCtx) { db := m.db(ctx.selectedDB) g, err := db.streamGroup(key, group) if err != nil { c.WriteError(err.Error()) return } if g == nil { c.WriteInt(0) return } cnt, err := g.ack(ids) if err != nil { c.WriteError(err.Error()) return } c.WriteInt(cnt) }) } // XDEL func (m *Miniredis) cmdXdel(c *server.Peer, cmd string, args []string) { if len(args) < 2 { setDirty(c) c.WriteError(errWrongNumber(cmd)) return } stream, ids := args[0], args[1:] withTx(m, c, func(c *server.Peer, ctx *connCtx) { db := m.db(ctx.selectedDB) s, err := db.stream(stream) if err != nil { c.WriteError(err.Error()) return } if s == nil { c.WriteInt(0) return } n, err := s.delete(ids) if err != nil { c.WriteError(err.Error()) return } db.keyVersion[stream]++ c.WriteInt(n) }) } // XREAD func (m *Miniredis) cmdXread(c *server.Peer, cmd string, args []string) { if len(args) < 3 { setDirty(c) c.WriteError(errWrongNumber(cmd)) return } var ( opts struct { count int streams []string ids []string block bool blockTimeout time.Duration } err error ) parsing: for len(args) > 0 { switch strings.ToUpper(args[0]) { case "COUNT": if len(args) < 2 { err = errors.New(errWrongNumber(cmd)) break parsing } opts.count, err = strconv.Atoi(args[1]) if err != nil { break parsing } args = args[2:] case "BLOCK": err = parseBlock(cmd, args, &opts.block, &opts.blockTimeout) if err != nil { break parsing } args = args[2:] case "STREAMS": args = args[1:] if len(args)%2 != 0 { err = errors.New(msgXreadUnbalanced) break parsing } opts.streams, opts.ids = args[0:len(args)/2], args[len(args)/2:] for _, id := range opts.ids { if _, err := parseStreamID(id); id != `$` && err != nil { setDirty(c) c.WriteError(msgInvalidStreamID) return } } args = nil break parsing default: err = fmt.Errorf("ERR incorrect argument %s", args[0]) break parsing } } if err != nil { setDirty(c) c.WriteError(err.Error()) return } if !opts.block { withTx(m, c, func(c *server.Peer, ctx *connCtx) { db := m.db(ctx.selectedDB) res := xread(db, opts.streams, opts.ids, opts.count) writeXread(c, opts.streams, res) }) return } blocking( m, c, opts.blockTimeout, func(c *server.Peer, ctx *connCtx) bool { db := m.db(ctx.selectedDB) res := xread(db, opts.streams, opts.ids, opts.count) if len(res) == 0 { return false } writeXread(c, opts.streams, res) return true }, func(c *server.Peer) { // timeout c.WriteLen(-1) }, ) } func xread(db *RedisDB, streams []string, ids []string, count int) map[string][]StreamEntry { res := map[string][]StreamEntry{} for i := range streams { stream := streams[i] id := ids[i] var s, ok = db.streamKeys[stream] if !ok { continue } entries := s.entries if len(entries) == 0 { continue } entryCount := count if entryCount == 0 { entryCount = len(entries) } var returnedEntries []StreamEntry for _, entry := range entries { if len(returnedEntries) == entryCount { break } if id == "$" { id = s.lastID() } if streamCmp(entry.ID, id) <= 0 { continue } returnedEntries = append(returnedEntries, entry) } if len(returnedEntries) > 0 { res[stream] = returnedEntries } } return res } func writeXread(c *server.Peer, streams []string, res map[string][]StreamEntry) { if len(res) == 0 { c.WriteLen(-1) return } c.WriteLen(len(res)) for _, stream := range streams { entries, ok := res[stream] if !ok { continue } c.WriteLen(2) c.WriteBulk(stream) c.WriteLen(len(entries)) for _, entry := range entries { c.WriteLen(2) c.WriteBulk(entry.ID) c.WriteLen(len(entry.Values)) for _, v := range entry.Values { c.WriteBulk(v) } } } } // XPENDING func (m *Miniredis) cmdXpending(c *server.Peer, cmd string, args []string) { if len(args) < 2 { setDirty(c) c.WriteError(errWrongNumber(cmd)) return } var opts struct { key string group string summary bool idle time.Duration start, end string count int consumer *string } opts.key, opts.group, args = args[0], args[1], args[2:] opts.summary = true if len(args) >= 3 { opts.summary = false if strings.ToUpper(args[0]) == "IDLE" { idleMs, err := strconv.ParseInt(args[1], 10, 64) if err != nil { setDirty(c) c.WriteError(msgInvalidInt) return } opts.idle = time.Duration(idleMs) * time.Millisecond args = args[2:] if len(args) < 3 { setDirty(c) c.WriteError(msgSyntaxError) return } } var err error opts.start, err = formatStreamRangeBound(args[0], true, false) if err != nil { setDirty(c) c.WriteError(msgInvalidStreamID) return } opts.end, err = formatStreamRangeBound(args[1], false, false) if err != nil { setDirty(c) c.WriteError(msgInvalidStreamID) return } opts.count, err = strconv.Atoi(args[2]) // negative is allowed if err != nil { setDirty(c) c.WriteError(msgInvalidInt) return } args = args[3:] if len(args) == 1 { opts.consumer, args = &args[0], args[1:] } } if len(args) != 0 { setDirty(c) c.WriteError(msgSyntaxError) return } withTx(m, c, func(c *server.Peer, ctx *connCtx) { db := m.db(ctx.selectedDB) g, err := db.streamGroup(opts.key, opts.group) if err != nil { c.WriteError(err.Error()) return } if g == nil { c.WriteError(errReadgroup(opts.key, opts.group).Error()) return } if opts.summary { writeXpendingSummary(c, *g) return } writeXpending(m.effectiveNow(), c, *g, opts.idle, opts.start, opts.end, opts.count, opts.consumer) }) } func writeXpendingSummary(c *server.Peer, g streamGroup) { if len(g.pending) == 0 { c.WriteLen(4) c.WriteInt(0) c.WriteNull() c.WriteNull() c.WriteLen(-1) return } // format: // - number of pending // - smallest ID // - highest ID // - all consumers with > 0 pending items c.WriteLen(4) c.WriteInt(len(g.pending)) c.WriteBulk(g.pending[0].id) c.WriteBulk(g.pending[len(g.pending)-1].id) cons := map[string]int{} for id := range g.consumers { cnt := g.pendingCount(id) if cnt > 0 { cons[id] = cnt } } c.WriteLen(len(cons)) var ids []string for id := range cons { ids = append(ids, id) } sort.Strings(ids) // be predicatable for _, id := range ids { c.WriteLen(2) c.WriteBulk(id) c.WriteBulk(strconv.Itoa(cons[id])) } } func writeXpending( now time.Time, c *server.Peer, g streamGroup, idle time.Duration, start, end string, count int, consumer *string, ) { if len(g.pending) == 0 || count < 0 { c.WriteLen(-1) return } // format, list of: // - message ID // - consumer // - milliseconds since delivery // - delivery count type entry struct { id string consumer string millis int count int } var res []entry for _, p := range g.pending { if len(res) >= count { break } if consumer != nil && p.consumer != *consumer { continue } if streamCmp(p.id, start) < 0 { continue } if streamCmp(p.id, end) > 0 { continue } timeSinceLastDelivery := now.Sub(p.lastDelivery) if timeSinceLastDelivery >= idle { res = append(res, entry{ id: p.id, consumer: p.consumer, millis: int(timeSinceLastDelivery.Milliseconds()), count: p.deliveryCount, }) } } if len(res) == 0 { c.WriteLen(-1) return } c.WriteLen(len(res)) for _, e := range res { c.WriteLen(4) c.WriteBulk(e.id) c.WriteBulk(e.consumer) c.WriteInt(e.millis) c.WriteInt(e.count) } } // XTRIM func (m *Miniredis) cmdXtrim(c *server.Peer, cmd string, args []string) { if len(args) < 3 { setDirty(c) c.WriteError(errWrongNumber(cmd)) return } var opts struct { stream string strategy string maxLen int // for MAXLEN threshold string // for MINID withLimit bool // "LIMIT" withExact bool // "=" withNearly bool // "~" } opts.stream, opts.strategy, args = args[0], strings.ToUpper(args[1]), args[2:] if opts.strategy != "MAXLEN" && opts.strategy != "MINID" { setDirty(c) c.WriteError(msgXtrimInvalidStrategy) return } // Ignore nearly exact trimming parameters. switch args[0] { case "=": opts.withExact = true args = args[1:] case "~": opts.withNearly = true args = args[1:] } switch opts.strategy { case "MAXLEN": maxLen, err := strconv.Atoi(args[0]) if err != nil { setDirty(c) c.WriteError(msgXtrimInvalidMaxLen) return } opts.maxLen = maxLen case "MINID": opts.threshold = args[0] } args = args[1:] if len(args) == 2 && strings.ToUpper(args[0]) == "LIMIT" { // Ignore LIMIT. opts.withLimit = true if _, err := strconv.Atoi(args[1]); err != nil { setDirty(c) c.WriteError(msgInvalidInt) return } args = args[2:] } if len(args) != 0 { setDirty(c) c.WriteError(fmt.Sprintf("ERR incorrect argument %s", args[0])) return } if opts.withLimit && !opts.withNearly { setDirty(c) c.WriteError(fmt.Sprintf(msgXtrimInvalidLimit)) return } withTx(m, c, func(c *server.Peer, ctx *connCtx) { db := m.db(ctx.selectedDB) s, err := db.stream(opts.stream) if err != nil { setDirty(c) c.WriteError(err.Error()) return } if s == nil { c.WriteInt(0) return } switch opts.strategy { case "MAXLEN": entriesBefore := len(s.entries) s.trim(opts.maxLen) c.WriteInt(entriesBefore - len(s.entries)) case "MINID": var delete []string for _, entry := range s.entries { if entry.ID < opts.threshold { delete = append(delete, entry.ID) } else { break } } s.delete(delete) c.WriteInt(len(delete)) } }) } // XAUTOCLAIM func (m *Miniredis) cmdXautoclaim(c *server.Peer, cmd string, args []string) { // XAUTOCLAIM key group consumer min-idle-time start if len(args) < 5 { setDirty(c) c.WriteError(errWrongNumber(cmd)) return } var opts struct { key string group string consumer string minIdleTime time.Duration start string justId bool count int } opts.key, opts.group, opts.consumer = args[0], args[1], args[2] n, err := strconv.Atoi(args[3]) if err != nil { setDirty(c) c.WriteError("ERR Invalid min-idle-time argument for XAUTOCLAIM") return } opts.minIdleTime = time.Millisecond * time.Duration(n) start_, err := formatStreamRangeBound(args[4], true, false) if err != nil { c.WriteError(msgInvalidStreamID) return } opts.start = start_ args = args[5:] opts.count = 100 parsing: for len(args) > 0 { switch strings.ToUpper(args[0]) { case "COUNT": if len(args) < 2 { err = errors.New(errWrongNumber(cmd)) break parsing } opts.count, err = strconv.Atoi(args[1]) if err != nil { break parsing } args = args[2:] case "JUSTID": args = args[1:] opts.justId = true default: err = errors.New(msgSyntaxError) break parsing } } if err != nil { setDirty(c) c.WriteError(err.Error()) return } withTx(m, c, func(c *server.Peer, ctx *connCtx) { db := m.db(ctx.selectedDB) g, err := db.streamGroup(opts.key, opts.group) if err != nil { c.WriteError(err.Error()) return } if g == nil { c.WriteError(errReadgroup(opts.key, opts.group).Error()) return } nextCallId, entries := xautoclaim(m.effectiveNow(), *g, opts.minIdleTime, opts.start, opts.count, opts.consumer) writeXautoclaim(c, nextCallId, entries, opts.justId) }) } func xautoclaim( now time.Time, g streamGroup, minIdleTime time.Duration, start string, count int, consumerID string, ) (string, []StreamEntry) { nextCallId := "0-0" if len(g.pending) == 0 || count < 0 { return nextCallId, nil } msgs := g.pendingAfter(start) var res []StreamEntry for i, p := range msgs { if minIdleTime > 0 && now.Before(p.lastDelivery.Add(minIdleTime)) { continue } prevConsumerID := p.consumer if _, ok := g.consumers[consumerID]; !ok { g.consumers[consumerID] = &consumer{} } p.consumer = consumerID _, entry := g.stream.get(p.id) // not found. Weird? if entry == nil { // TODO: support third element of return from XAUTOCLAIM, which // should delete entries not found in the PEL during XAUTOCLAIM. // (Introduced in Redis 7.0) continue } p.deliveryCount += 1 p.lastDelivery = now g.consumers[prevConsumerID].numPendingEntries-- g.consumers[consumerID].numPendingEntries++ msgs[i] = p res = append(res, *entry) if len(res) >= count { if len(msgs) > i+1 { nextCallId = msgs[i+1].id } break } } return nextCallId, res } func writeXautoclaim(c *server.Peer, nextCallId string, res []StreamEntry, justId bool) { c.WriteLen(2) c.WriteBulk(nextCallId) c.WriteLen(len(res)) for _, entry := range res { if justId { c.WriteBulk(entry.ID) continue } c.WriteLen(2) c.WriteBulk(entry.ID) c.WriteLen(len(entry.Values)) for _, v := range entry.Values { c.WriteBulk(v) } } } // XCLAIM func (m *Miniredis) cmdXclaim(c *server.Peer, cmd string, args []string) { if len(args) < 5 { setDirty(c) c.WriteError(errWrongNumber(cmd)) return } var opts struct { key string groupName string consumerName string minIdleTime time.Duration newLastDelivery time.Time ids []string retryCount *int force bool justId bool } opts.key, opts.groupName, opts.consumerName = args[0], args[1], args[2] minIdleTimeMillis, err := strconv.Atoi(args[3]) if err != nil { setDirty(c) c.WriteError("ERR Invalid min-idle-time argument for XCLAIM") return } opts.minIdleTime = time.Millisecond * time.Duration(minIdleTimeMillis) opts.newLastDelivery = m.effectiveNow() opts.ids = append(opts.ids, args[4]) args = args[5:] for len(args) > 0 { arg := strings.ToUpper(args[0]) if arg == "IDLE" || arg == "TIME" || arg == "RETRYCOUNT" || arg == "FORCE" || arg == "JUSTID" { break } opts.ids = append(opts.ids, arg) args = args[1:] } for len(args) > 0 { arg := strings.ToUpper(args[0]) switch arg { case "IDLE": idleMs, err := strconv.ParseInt(args[1], 10, 64) if err != nil { setDirty(c) c.WriteError("ERR Invalid IDLE option argument for XCLAIM") return } if idleMs < 0 { idleMs = 0 } opts.newLastDelivery = m.effectiveNow().Add(time.Millisecond * time.Duration(-idleMs)) args = args[2:] case "TIME": timeMs, err := strconv.ParseInt(args[1], 10, 64) if err != nil { setDirty(c) c.WriteError("ERR Invalid TIME option argument for XCLAIM") return } opts.newLastDelivery = unixMilli(timeMs) args = args[2:] case "RETRYCOUNT": retryCount, err := strconv.Atoi(args[1]) if err != nil { setDirty(c) c.WriteError("ERR Invalid RETRYCOUNT option argument for XCLAIM") return } opts.retryCount = &retryCount args = args[2:] case "FORCE": opts.force = true args = args[1:] case "JUSTID": opts.justId = true args = args[1:] default: setDirty(c) c.WriteError(fmt.Sprintf("ERR Unrecognized XCLAIM option '%s'", args[0])) return } } withTx(m, c, func(c *server.Peer, ctx *connCtx) { db := m.db(ctx.selectedDB) g, err := db.streamGroup(opts.key, opts.groupName) if err != nil { c.WriteError(err.Error()) return } if g == nil { c.WriteError(errReadgroup(opts.key, opts.groupName).Error()) return } claimedEntryIDs := m.xclaim(g, opts.consumerName, opts.minIdleTime, opts.newLastDelivery, opts.ids, opts.retryCount, opts.force) writeXclaim(c, g.stream, claimedEntryIDs, opts.justId) }) } func (m *Miniredis) xclaim( group *streamGroup, consumerName string, minIdleTime time.Duration, newLastDelivery time.Time, ids []string, retryCount *int, force bool, ) (claimedEntryIDs []string) { for _, id := range ids { pelPos, pelEntry := group.searchPending(id) if pelEntry == nil { if !force { continue } if pelPos < len(group.pending) { group.pending = append(group.pending[:pelPos+1], group.pending[pelPos:]...) } else { group.pending = append(group.pending, pendingEntry{}) } pelEntry = &group.pending[pelPos] *pelEntry = pendingEntry{ id: id, consumer: consumerName, deliveryCount: 1, } } else { group.consumers[pelEntry.consumer].numPendingEntries-- pelEntry.consumer = consumerName } if retryCount != nil { pelEntry.deliveryCount = *retryCount } else { pelEntry.deliveryCount++ } pelEntry.lastDelivery = newLastDelivery claimedEntryIDs = append(claimedEntryIDs, id) } if len(claimedEntryIDs) == 0 { return } if _, ok := group.consumers[consumerName]; !ok { group.consumers[consumerName] = &consumer{} } consumer := group.consumers[consumerName] consumer.numPendingEntries += len(claimedEntryIDs) return } func writeXclaim(c *server.Peer, stream *streamKey, claimedEntryIDs []string, justId bool) { c.WriteLen(len(claimedEntryIDs)) for _, id := range claimedEntryIDs { if justId { c.WriteBulk(id) continue } _, entry := stream.get(id) if entry == nil { c.WriteNull() continue } c.WriteLen(2) c.WriteBulk(entry.ID) c.WriteStrings(entry.Values) } } func parseBlock(cmd string, args []string, block *bool, timeout *time.Duration) error { if len(args) < 2 { return errors.New(errWrongNumber(cmd)) } (*block) = true ms, err := strconv.Atoi(args[1]) if err != nil { return errors.New(msgInvalidInt) } if ms < 0 { return errors.New("ERR timeout is negative") } (*timeout) = time.Millisecond * time.Duration(ms) return nil } // taken from Go's time package. Can be dropped if miniredis supports >= 1.17 func unixMilli(msec int64) time.Time { return time.Unix(msec/1e3, (msec%1e3)*1e6) }