// Copyright 2020 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package nats import ( "bytes" "context" "encoding/json" "errors" "fmt" "net/http" "strconv" "strings" "time" ) // JetStream is the public interface for the JetStream context. type JetStream interface { // Publishing messages to JetStream. Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) // Subscribing to messages in JetStream. Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error) // Channel versions. ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) // QueueSubscribe. QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) // Management // Create a stream. AddStream(cfg *StreamConfig) (*StreamInfo, error) // Create a consumer. AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, error) // Stream information. StreamInfo(stream string) (*StreamInfo, error) // TODO(dlc) - add more } // APIError is included in all API responses if there was an error. type APIError struct { Code int `json:"code"` Description string `json:"description,omitempty"` } // APIResponse is a standard response from the JetStream JSON API type APIResponse struct { Type string `json:"type"` Error *APIError `json:"error,omitempty"` } type AccountInfoResponse struct { APIResponse *AccountStats } // AccountLimits is for the information about type AccountLimits struct { MaxMemory int64 `json:"max_memory"` MaxStore int64 `json:"max_storage"` MaxStreams int `json:"max_streams"` MaxConsumers int `json:"max_consumers"` } // AccountStats returns current statistics about the account's JetStream usage. type AccountStats struct { Memory uint64 `json:"memory"` Store uint64 `json:"storage"` Streams int `json:"streams"` Limits AccountLimits `json:"limits"` } // Internal struct for jetstream type js struct { nc *Conn // For importing JetStream from other accounts. pre string // Amount of time to wait for API requests. wait time.Duration // Signals only direct access and no API access. direct bool } // Request API subjects for JetStream. const ( JSDefaultAPIPrefix = "$JS.API." // JSApiAccountInfo is for obtaining general information about JetStream. JSApiAccountInfo = "INFO" // JSApiStreams can lookup a stream by subject. JSApiStreams = "STREAM.NAMES" // JSApiConsumerCreateT is used to create consumers. JSApiConsumerCreateT = "CONSUMER.CREATE.%s" // JSApiDurableCreateT is used to create durable consumers. JSApiDurableCreateT = "CONSUMER.DURABLE.CREATE.%s.%s" // JSApiConsumerInfoT is used to create consumers. JSApiConsumerInfoT = "CONSUMER.INFO.%s.%s" // JSApiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode. JSApiRequestNextT = "CONSUMER.MSG.NEXT.%s.%s" // JSApiStreamCreateT is the endpoint to create new streams. JSApiStreamCreateT = "STREAM.CREATE.%s" // JSApiStreamInfoT is the endpoint to get information on a stream. JSApiStreamInfoT = "STREAM.INFO.%s" ) // JetStream returns a JetStream context for pub/sub interactions. func (nc *Conn) JetStream(opts ...JSOpt) (JetStream, error) { const defaultRequestWait = 5 * time.Second js := &js{nc: nc, pre: JSDefaultAPIPrefix, wait: defaultRequestWait} for _, f := range opts { if err := f(js); err != nil { return nil, err } } if js.direct { return js, nil } resp, err := nc.Request(js.apiSubj(JSApiAccountInfo), nil, js.wait) if err != nil { if err == ErrNoResponders { err = ErrJetStreamNotEnabled } return nil, err } var info AccountInfoResponse if err := json.Unmarshal(resp.Data, &info); err != nil { return nil, err } if info.Error != nil && info.Error.Code == 503 { return nil, ErrJetStreamNotEnabled } return js, nil } // JSOpt configures options for the jetstream context. type JSOpt func(opts *js) error func APIPrefix(pre string) JSOpt { return func(js *js) error { js.pre = pre if !strings.HasSuffix(js.pre, ".") { js.pre = js.pre + "." } return nil } } func APIRequestWait(wait time.Duration) JSOpt { return func(js *js) error { js.wait = wait return nil } } func DirectOnly() JSOpt { return func(js *js) error { js.direct = true return nil } } func (js *js) apiSubj(subj string) string { if js.pre == _EMPTY_ { return subj } var b strings.Builder b.WriteString(js.pre) b.WriteString(subj) return b.String() } // PubOpt configures options for publishing jetstream messages. type PubOpt func(opts *pubOpts) error type pubOpts struct { ctx context.Context ttl time.Duration id string lid string // Expected last msgId str string // Expected stream name seq uint64 // Expected last sequence } type PubAckResponse struct { APIResponse *PubAck } type PubAck struct { Stream string `json:"stream"` Sequence uint64 `json:"seq"` Duplicate bool `json:"duplicate,omitempty"` } // Headers for published messages. const ( MsgIdHdr = "Nats-Msg-Id" ExpectedStreamHdr = "Nats-Expected-Stream" ExpectedLastSeqHdr = "Nats-Expected-Last-Sequence" ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id" ) func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) { var o pubOpts if len(opts) > 0 { if m.Header == nil { m.Header = http.Header{} } for _, f := range opts { if err := f(&o); err != nil { return nil, err } } } // Check for option collisions. Right now just timeout and context. if o.ctx != nil && o.ttl != 0 { return nil, ErrContextAndTimeout } if o.ttl == 0 && o.ctx == nil { o.ttl = js.wait } if o.id != _EMPTY_ { m.Header.Set(MsgIdHdr, o.id) } if o.lid != _EMPTY_ { m.Header.Set(ExpectedLastMsgIdHdr, o.lid) } if o.str != _EMPTY_ { m.Header.Set(ExpectedStreamHdr, o.str) } if o.seq > 0 { m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(o.seq, 10)) } var resp *Msg var err error if o.ttl > 0 { resp, err = js.nc.RequestMsg(m, o.ttl) } else { resp, err = js.nc.RequestMsgWithContext(o.ctx, m) } if err != nil { if err == ErrNoResponders { err = ErrNoStreamResponse } return nil, err } var pa PubAckResponse if err := json.Unmarshal(resp.Data, &pa); err != nil { return nil, ErrInvalidJSAck } if pa.Error != nil { return nil, errors.New(pa.Error.Description) } if pa.PubAck == nil || pa.PubAck.Stream == _EMPTY_ { return nil, ErrInvalidJSAck } return pa.PubAck, nil } func (js *js) Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error) { return js.PublishMsg(&Msg{Subject: subj, Data: data}, opts...) } // Options for publishing to JetStream. // MsgId sets the message ID used for de-duplication. func MsgId(id string) PubOpt { return func(opts *pubOpts) error { opts.id = id return nil } } // ExpectStream sets the expected stream to respond from the publish. func ExpectStream(stream string) PubOpt { return func(opts *pubOpts) error { opts.str = stream return nil } } // ExpectLastSequence sets the expected sequence in the response from the publish. func ExpectLastSequence(seq uint64) PubOpt { return func(opts *pubOpts) error { opts.seq = seq return nil } } // ExpectLastSequence sets the expected sequence in the response from the publish. func ExpectLastMsgId(id string) PubOpt { return func(opts *pubOpts) error { opts.lid = id return nil } } // MaxWait sets the maximum amount of time we will wait for a response from JetStream. func MaxWait(ttl time.Duration) PubOpt { return func(opts *pubOpts) error { opts.ttl = ttl return nil } } // Context sets the contect to make the call to JetStream. func Context(ctx context.Context) PubOpt { return func(opts *pubOpts) error { opts.ctx = ctx return nil } } // Subscribe // We will match subjects to streams and consumers on the user's behalf. type JSApiCreateConsumerRequest struct { Stream string `json:"stream_name"` Config *ConsumerConfig `json:"config"` } type ConsumerConfig struct { Durable string `json:"durable_name,omitempty"` DeliverSubject string `json:"deliver_subject,omitempty"` DeliverPolicy DeliverPolicy `json:"deliver_policy"` OptStartSeq uint64 `json:"opt_start_seq,omitempty"` OptStartTime *time.Time `json:"opt_start_time,omitempty"` AckPolicy AckPolicy `json:"ack_policy"` AckWait time.Duration `json:"ack_wait,omitempty"` MaxDeliver int `json:"max_deliver,omitempty"` FilterSubject string `json:"filter_subject,omitempty"` ReplayPolicy ReplayPolicy `json:"replay_policy"` RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec SampleFrequency string `json:"sample_freq,omitempty"` MaxWaiting int `json:"max_waiting,omitempty"` MaxAckPending int `json:"max_ack_pending,omitempty"` } type JSApiConsumerResponse struct { APIResponse *ConsumerInfo } type ConsumerInfo struct { Stream string `json:"stream_name"` Name string `json:"name"` Created time.Time `json:"created"` Config ConsumerConfig `json:"config"` Delivered SequencePair `json:"delivered"` AckFloor SequencePair `json:"ack_floor"` NumAckPending int `json:"num_ack_pending"` NumRedelivered int `json:"num_redelivered"` NumWaiting int `json:"num_waiting"` NumPending uint64 `json:"num_pending"` } type SequencePair struct { Consumer uint64 `json:"consumer_seq"` Stream uint64 `json:"stream_seq"` } // NextRequest is for getting next messages for pull based consumers. type NextRequest struct { Expires *time.Time `json:"expires,omitempty"` Batch int `json:"batch,omitempty"` NoWait bool `json:"no_wait,omitempty"` } // SubOpt configures options for subscribing to JetStream consumers. type SubOpt func(opts *subOpts) error // Subscribe will create a subscription to the appropriate stream and consumer. func (js *js) Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) { return js.subscribe(subj, _EMPTY_, cb, nil, opts) } // SubscribeSync will create a sync subscription to the appropriate stream and consumer. func (js *js) SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error) { mch := make(chan *Msg, js.nc.Opts.SubChanLen) return js.subscribe(subj, _EMPTY_, nil, mch, opts) } // QueueSubscribe will create a subscription to the appropriate stream and consumer with queue semantics. func (js *js) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) { return js.subscribe(subj, queue, cb, nil, opts) } // Subscribe will create a subscription to the appropriate stream and consumer. func (js *js) ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) { return js.subscribe(subj, _EMPTY_, nil, ch, opts) } // APIPaged includes variables used to create paged responses from the JSON API type APIPaged struct { Total int `json:"total"` Offset int `json:"offset"` Limit int `json:"limit"` } type streamRequest struct { Subject string `json:"subject,omitempty"` } type JSApiStreamNamesResponse struct { APIResponse APIPaged Streams []string `json:"streams"` } func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []SubOpt) (*Subscription, error) { cfg := ConsumerConfig{AckPolicy: ackPolicyNotSet} o := subOpts{cfg: &cfg} if len(opts) > 0 { for _, f := range opts { if err := f(&o); err != nil { return nil, err } } } isPullMode := o.pull > 0 if cb != nil && isPullMode { return nil, ErrPullModeNotAllowed } var err error var stream, deliver string var ccfg *ConsumerConfig // If we are attaching to an existing consumer. shouldAttach := o.stream != _EMPTY_ && o.consumer != _EMPTY_ || o.cfg.DeliverSubject != _EMPTY_ shouldCreate := !shouldAttach if js.direct && shouldCreate { return nil, ErrDirectModeRequired } if js.direct { if o.cfg.DeliverSubject != _EMPTY_ { deliver = o.cfg.DeliverSubject } else { deliver = NewInbox() } } else if shouldAttach { info, err := js.getConsumerInfo(o.stream, o.consumer) if err != nil { return nil, err } ccfg = &info.Config // Make sure this new subject matches or is a subset. if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject { return nil, ErrSubjectMismatch } if ccfg.DeliverSubject != _EMPTY_ { deliver = ccfg.DeliverSubject } else { deliver = NewInbox() } } else { stream, err = js.lookupStreamBySubject(subj) if err != nil { return nil, err } deliver = NewInbox() if !isPullMode { cfg.DeliverSubject = deliver } // Do filtering always, server will clear as needed. cfg.FilterSubject = subj } var sub *Subscription // Check if we are manual ack. if cb != nil && !o.mack { ocb := cb cb = func(m *Msg) { ocb(m); m.Ack() } } sub, err = js.nc.subscribe(deliver, queue, cb, ch, cb == nil, &jsSub{js: js}) if err != nil { return nil, err } // If we are creating or updating let's process that request. if shouldCreate { // If not set default to ack explicit. if cfg.AckPolicy == ackPolicyNotSet { cfg.AckPolicy = AckExplicit } // If we have acks at all and the MaxAckPending is not set go ahead // and set to the internal max. // TODO(dlc) - We should be able to update this if client updates PendingLimits. if cfg.MaxAckPending == 0 && cfg.AckPolicy != AckNone { maxMsgs, _, _ := sub.PendingLimits() cfg.MaxAckPending = maxMsgs } req := &JSApiCreateConsumerRequest{ Stream: stream, Config: &cfg, } j, err := json.Marshal(req) if err != nil { return nil, err } var ccSubj string if cfg.Durable != _EMPTY_ { ccSubj = fmt.Sprintf(JSApiDurableCreateT, stream, cfg.Durable) } else { ccSubj = fmt.Sprintf(JSApiConsumerCreateT, stream) } resp, err := js.nc.Request(js.apiSubj(ccSubj), j, js.wait) if err != nil { if err == ErrNoResponders { err = ErrJetStreamNotEnabled } sub.Unsubscribe() return nil, err } var info JSApiConsumerResponse err = json.Unmarshal(resp.Data, &info) if err != nil { sub.Unsubscribe() return nil, err } if info.Error != nil { sub.Unsubscribe() return nil, errors.New(info.Error.Description) } // Hold onto these for later. sub.jsi.stream = info.Stream sub.jsi.consumer = info.Name sub.jsi.deliver = info.Config.DeliverSubject } else { sub.jsi.stream = o.stream sub.jsi.consumer = o.consumer if js.direct { sub.jsi.deliver = o.cfg.DeliverSubject } else { sub.jsi.deliver = ccfg.DeliverSubject } } // If we are pull based go ahead and fire off the first request to populate. if isPullMode { sub.jsi.pull = o.pull sub.Poll() } return sub, nil } func (js *js) lookupStreamBySubject(subj string) (string, error) { var slr JSApiStreamNamesResponse // FIXME(dlc) - prefix req := &streamRequest{subj} j, err := json.Marshal(req) if err != nil { return _EMPTY_, err } resp, err := js.nc.Request(js.apiSubj(JSApiStreams), j, js.wait) if err != nil { if err == ErrNoResponders { err = ErrJetStreamNotEnabled } return _EMPTY_, err } if err := json.Unmarshal(resp.Data, &slr); err != nil { return _EMPTY_, err } if slr.Error != nil || len(slr.Streams) != 1 { return _EMPTY_, ErrNoMatchingStream } return slr.Streams[0], nil } type subOpts struct { // For attaching. stream, consumer string // For pull based consumers, batch size for pull pull int // For manual ack mack bool // For creating or updating. cfg *ConsumerConfig } func Durable(name string) SubOpt { return func(opts *subOpts) error { opts.cfg.Durable = name return nil } } func Attach(stream, consumer string) SubOpt { return func(opts *subOpts) error { opts.stream = stream opts.consumer = consumer return nil } } func Pull(batchSize int) SubOpt { return func(opts *subOpts) error { if batchSize == 0 { return errors.New("nats: batch size of 0 not valid") } opts.pull = batchSize return nil } } func PullDirect(stream, consumer string, batchSize int) SubOpt { return func(opts *subOpts) error { if batchSize == 0 { return errors.New("nats: batch size of 0 not valid") } opts.stream = stream opts.consumer = consumer opts.pull = batchSize return nil } } func PushDirect(deliverSubject string) SubOpt { return func(opts *subOpts) error { opts.cfg.DeliverSubject = deliverSubject return nil } } func ManualAck() SubOpt { return func(opts *subOpts) error { opts.mack = true return nil } } func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) { sub.mu.Lock() // TODO(dlc) - Better way to mark especially if we attach. if sub.jsi.consumer == _EMPTY_ { sub.mu.Unlock() return nil, ErrTypeSubscription } js := sub.jsi.js stream, consumer := sub.jsi.stream, sub.jsi.consumer sub.mu.Unlock() return js.getConsumerInfo(stream, consumer) } func (sub *Subscription) Poll() error { sub.mu.Lock() if sub.jsi == nil || sub.jsi.deliver != _EMPTY_ || sub.jsi.pull == 0 { sub.mu.Unlock() return ErrTypeSubscription } batch := sub.jsi.pull nc, reply := sub.conn, sub.Subject stream, consumer := sub.jsi.stream, sub.jsi.consumer js := sub.jsi.js sub.mu.Unlock() req, _ := json.Marshal(&NextRequest{Batch: batch}) reqNext := js.apiSubj(fmt.Sprintf(JSApiRequestNextT, stream, consumer)) return nc.PublishRequest(reqNext, reply, req) } func (js *js) getConsumerInfo(stream, consumer string) (*ConsumerInfo, error) { // FIXME(dlc) - prefix ccInfoSubj := fmt.Sprintf(JSApiConsumerInfoT, stream, consumer) resp, err := js.nc.Request(js.apiSubj(ccInfoSubj), nil, js.wait) if err != nil { if err == ErrNoResponders { err = ErrJetStreamNotEnabled } return nil, err } var info JSApiConsumerResponse if err := json.Unmarshal(resp.Data, &info); err != nil { return nil, err } if info.Error != nil { return nil, errors.New(info.Error.Description) } return info.ConsumerInfo, nil } func (m *Msg) checkReply() (*js, bool, error) { if m.Reply == "" { return nil, false, ErrMsgNoReply } if m == nil || m.Sub == nil { return nil, false, ErrMsgNotBound } sub := m.Sub sub.mu.Lock() if sub.jsi == nil { sub.mu.Unlock() return nil, false, ErrNotJSMessage } js := sub.jsi.js isPullMode := sub.jsi.pull > 0 sub.mu.Unlock() return js, isPullMode, nil } // ackReply handles all acks. Will do the right thing for pull and sync mode. func (m *Msg) ackReply(ackType []byte, sync bool) error { js, isPullMode, err := m.checkReply() if err != nil { return err } if isPullMode { if bytes.Equal(ackType, AckAck) { err = js.nc.PublishRequest(m.Reply, m.Sub.Subject, AckNext) } else if bytes.Equal(ackType, AckNak) || bytes.Equal(ackType, AckTerm) { err = js.nc.PublishRequest(m.Reply, m.Sub.Subject, []byte("+NXT {\"batch\":1}")) } if sync && err == nil { _, err = js.nc.Request(m.Reply, nil, js.wait) } } else if sync { _, err = js.nc.Request(m.Reply, ackType, js.wait) } else { err = js.nc.Publish(m.Reply, ackType) } return err } // Acks for messages // Ack a message, this will do the right thing with pull based consumers. func (m *Msg) Ack() error { return m.ackReply(AckAck, false) } // Ack a message and wait for a response from the server. func (m *Msg) AckSync() error { return m.ackReply(AckAck, true) } // Nak this message, indicating we can not process. func (m *Msg) Nak() error { return m.ackReply(AckNak, false) } // Term this message from ever being delivered regardless of MaxDeliverCount. func (m *Msg) Term() error { return m.ackReply(AckTerm, false) } // Indicate that this message is being worked on and reset redelkivery timer in the server. func (m *Msg) InProgress() error { return m.ackReply(AckProgress, false) } // JetStream metadata associated with received messages. type MsgMetaData struct { Consumer uint64 Stream uint64 Delivered uint64 Pending uint64 Timestamp time.Time } func (m *Msg) MetaData() (*MsgMetaData, error) { if _, _, err := m.checkReply(); err != nil { return nil, err } const expectedTokens = 9 const btsep = '.' tsa := [expectedTokens]string{} start, tokens := 0, tsa[:0] subject := m.Reply for i := 0; i < len(subject); i++ { if subject[i] == btsep { tokens = append(tokens, subject[start:i]) start = i + 1 } } tokens = append(tokens, subject[start:]) if len(tokens) != expectedTokens || tokens[0] != "$JS" || tokens[1] != "ACK" { return nil, ErrNotJSMessage } meta := &MsgMetaData{ Delivered: uint64(parseNum(tokens[4])), Stream: uint64(parseNum(tokens[5])), Consumer: uint64(parseNum(tokens[6])), Timestamp: time.Unix(0, parseNum(tokens[7])), Pending: uint64(parseNum(tokens[8])), } return meta, nil } // Quick parser for positive numbers in ack reply encoding. func parseNum(d string) (n int64) { if len(d) == 0 { return -1 } // Ascii numbers 0-9 const ( asciiZero = 48 asciiNine = 57 ) for _, dec := range d { if dec < asciiZero || dec > asciiNine { return -1 } n = n*10 + (int64(dec) - asciiZero) } return n } // Additional jetstream structures. type AckPolicy int const ( AckNone AckPolicy = iota AckAll AckExplicit // For setting ackPolicyNotSet = 99 ) func jsonString(s string) string { return "\"" + s + "\"" } func (p *AckPolicy) UnmarshalJSON(data []byte) error { switch string(data) { case jsonString("none"): *p = AckNone case jsonString("all"): *p = AckAll case jsonString("explicit"): *p = AckExplicit default: return fmt.Errorf("can not unmarshal %q", data) } return nil } func (p AckPolicy) MarshalJSON() ([]byte, error) { switch p { case AckNone: return json.Marshal("none") case AckAll: return json.Marshal("all") case AckExplicit: return json.Marshal("explicit") default: return nil, fmt.Errorf("unknown acknowlegement policy %v", p) } } func (p AckPolicy) String() string { switch p { case AckNone: return "AckNone" case AckAll: return "AckAll" case AckExplicit: return "AckExplicit" case ackPolicyNotSet: return "Not Initialized" default: return "Unknown AckPolicy" } } type ReplayPolicy int const ( ReplayInstant ReplayPolicy = iota ReplayOriginal ) func (p *ReplayPolicy) UnmarshalJSON(data []byte) error { switch string(data) { case jsonString("instant"): *p = ReplayInstant case jsonString("original"): *p = ReplayOriginal default: return fmt.Errorf("can not unmarshal %q", data) } return nil } func (p ReplayPolicy) MarshalJSON() ([]byte, error) { switch p { case ReplayOriginal: return json.Marshal("original") case ReplayInstant: return json.Marshal("instant") default: return nil, fmt.Errorf("unknown replay policy %v", p) } } var ( AckAck = []byte("+ACK") AckNak = []byte("-NAK") AckProgress = []byte("+WPI") AckNext = []byte("+NXT") AckTerm = []byte("+TERM") ) type DeliverPolicy int const ( DeliverAll DeliverPolicy = iota DeliverLast DeliverNew DeliverByStartSequence DeliverByStartTime ) func (p *DeliverPolicy) UnmarshalJSON(data []byte) error { switch string(data) { case jsonString("all"), jsonString("undefined"): *p = DeliverAll case jsonString("last"): *p = DeliverLast case jsonString("new"): *p = DeliverNew case jsonString("by_start_sequence"): *p = DeliverByStartSequence case jsonString("by_start_time"): *p = DeliverByStartTime } return nil } func (p DeliverPolicy) MarshalJSON() ([]byte, error) { switch p { case DeliverAll: return json.Marshal("all") case DeliverLast: return json.Marshal("last") case DeliverNew: return json.Marshal("new") case DeliverByStartSequence: return json.Marshal("by_start_sequence") case DeliverByStartTime: return json.Marshal("by_start_time") default: return nil, fmt.Errorf("unknown deliver policy %v", p) } } // Management for JetStream // TODO(dlc) - Fill this out. // AddConsumer will add a JetStream consumer. func (js *js) AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, error) { if stream == _EMPTY_ { return nil, ErrStreamNameRequired } req, err := json.Marshal(&JSApiCreateConsumerRequest{Stream: stream, Config: cfg}) if err != nil { return nil, err } var ccSubj string if cfg.Durable != _EMPTY_ { ccSubj = fmt.Sprintf(JSApiDurableCreateT, stream, cfg.Durable) } else { ccSubj = fmt.Sprintf(JSApiConsumerCreateT, stream) } resp, err := js.nc.Request(js.apiSubj(ccSubj), req, js.wait) if err != nil { if err == ErrNoResponders { err = ErrJetStreamNotEnabled } return nil, err } var info JSApiConsumerResponse err = json.Unmarshal(resp.Data, &info) if err != nil { return nil, err } if info.Error != nil { return nil, errors.New(info.Error.Description) } return info.ConsumerInfo, nil } // StreamConfig will determine the properties for a stream. // There are sensible defaults for most. If no subjects are // given the name will be used as the only subject. type StreamConfig struct { Name string `json:"name"` Subjects []string `json:"subjects,omitempty"` Retention RetentionPolicy `json:"retention"` MaxConsumers int `json:"max_consumers"` MaxMsgs int64 `json:"max_msgs"` MaxBytes int64 `json:"max_bytes"` Discard DiscardPolicy `json:"discard"` MaxAge time.Duration `json:"max_age"` MaxMsgSize int32 `json:"max_msg_size,omitempty"` Storage StorageType `json:"storage"` Replicas int `json:"num_replicas"` NoAck bool `json:"no_ack,omitempty"` Template string `json:"template_owner,omitempty"` Duplicates time.Duration `json:"duplicate_window,omitempty"` } // JSApiStreamCreateResponse stream creation. type JSApiStreamCreateResponse struct { APIResponse *StreamInfo } func (js *js) AddStream(cfg *StreamConfig) (*StreamInfo, error) { if cfg == nil || cfg.Name == _EMPTY_ { return nil, ErrStreamNameRequired } req, err := json.Marshal(cfg) if err != nil { return nil, err } csSubj := js.apiSubj(fmt.Sprintf(JSApiStreamCreateT, cfg.Name)) r, err := js.nc.Request(csSubj, req, js.wait) if err != nil { return nil, err } var resp JSApiStreamCreateResponse if err := json.Unmarshal(r.Data, &resp); err != nil { return nil, err } if resp.Error != nil { return nil, errors.New(resp.Error.Description) } return resp.StreamInfo, nil } type JSApiStreamInfoResponse = JSApiStreamCreateResponse func (js *js) StreamInfo(stream string) (*StreamInfo, error) { csSubj := js.apiSubj(fmt.Sprintf(JSApiStreamInfoT, stream)) r, err := js.nc.Request(csSubj, nil, js.wait) if err != nil { return nil, err } var resp JSApiStreamInfoResponse if err := json.Unmarshal(r.Data, &resp); err != nil { return nil, err } if resp.Error != nil { return nil, errors.New(resp.Error.Description) } return resp.StreamInfo, nil } // StreamInfo shows config and current state for this stream. type StreamInfo struct { Config StreamConfig `json:"config"` Created time.Time `json:"created"` State StreamState `json:"state"` } // StreamStats is information about the given stream. type StreamState struct { Msgs uint64 `json:"messages"` Bytes uint64 `json:"bytes"` FirstSeq uint64 `json:"first_seq"` FirstTime time.Time `json:"first_ts"` LastSeq uint64 `json:"last_seq"` LastTime time.Time `json:"last_ts"` Consumers int `json:"consumer_count"` } // RetentionPolicy determines how messages in a set are retained. type RetentionPolicy int const ( // LimitsPolicy (default) means that messages are retained until any given limit is reached. // This could be one of MaxMsgs, MaxBytes, or MaxAge. LimitsPolicy RetentionPolicy = iota // InterestPolicy specifies that when all known observables have acknowledged a message it can be removed. InterestPolicy // WorkQueuePolicy specifies that when the first worker or subscriber acknowledges the message it can be removed. WorkQueuePolicy ) // Discard Policy determines how we proceed when limits of messages or bytes are hit. The default, DicscardOld will // remove older messages. DiscardNew will fail to store the new message. type DiscardPolicy int const ( // DiscardOld will remove older messages to return to the limits. DiscardOld = iota //DiscardNew will error on a StoreMsg call DiscardNew ) const ( limitsPolicyString = "limits" interestPolicyString = "interest" workQueuePolicyString = "workqueue" ) func (rp RetentionPolicy) String() string { switch rp { case LimitsPolicy: return "Limits" case InterestPolicy: return "Interest" case WorkQueuePolicy: return "WorkQueue" default: return "Unknown Retention Policy" } } func (rp RetentionPolicy) MarshalJSON() ([]byte, error) { switch rp { case LimitsPolicy: return json.Marshal(limitsPolicyString) case InterestPolicy: return json.Marshal(interestPolicyString) case WorkQueuePolicy: return json.Marshal(workQueuePolicyString) default: return nil, fmt.Errorf("can not marshal %v", rp) } } func (rp *RetentionPolicy) UnmarshalJSON(data []byte) error { switch string(data) { case jsonString(limitsPolicyString): *rp = LimitsPolicy case jsonString(interestPolicyString): *rp = InterestPolicy case jsonString(workQueuePolicyString): *rp = WorkQueuePolicy default: return fmt.Errorf("can not unmarshal %q", data) } return nil } func (dp DiscardPolicy) String() string { switch dp { case DiscardOld: return "DiscardOld" case DiscardNew: return "DiscardNew" default: return "Unknown Discard Policy" } } func (dp DiscardPolicy) MarshalJSON() ([]byte, error) { switch dp { case DiscardOld: return json.Marshal("old") case DiscardNew: return json.Marshal("new") default: return nil, fmt.Errorf("can not marshal %v", dp) } } func (dp *DiscardPolicy) UnmarshalJSON(data []byte) error { switch strings.ToLower(string(data)) { case jsonString("old"): *dp = DiscardOld case jsonString("new"): *dp = DiscardNew default: return fmt.Errorf("can not unmarshal %q", data) } return nil } // StorageType determines how messages are stored for retention. type StorageType int const ( // MemoryStorage specifies in memory only. MemoryStorage StorageType = iota // FileStorage specifies on disk, designated by the JetStream config StoreDir. FileStorage ) const ( memoryStorageString = "memory" fileStorageString = "file" ) func (st StorageType) String() string { switch st { case MemoryStorage: return strings.Title(memoryStorageString) case FileStorage: return strings.Title(fileStorageString) default: return "Unknown Storage Type" } } func (st StorageType) MarshalJSON() ([]byte, error) { switch st { case MemoryStorage: return json.Marshal(memoryStorageString) case FileStorage: return json.Marshal(fileStorageString) default: return nil, fmt.Errorf("can not marshal %v", st) } } func (st *StorageType) UnmarshalJSON(data []byte) error { switch string(data) { case jsonString(memoryStorageString): *st = MemoryStorage case jsonString(fileStorageString): *st = FileStorage default: return fmt.Errorf("can not unmarshal %q", data) } return nil }