[ADDED] WithExpectLastSequenceForSubject publish option (#1920)

This utilizes the Nats-Expected-Last-Subject-Sequence-Subject header
that became available in 2.11.0.

Signed-off-by: Byron Ruth <byron@nats.io>
This commit is contained in:
Byron Ruth
2025-08-16 06:16:54 -04:00
committed by GitHub
parent 7a260b8b93
commit 22f2b9dbce
4 changed files with 199 additions and 3 deletions

View File

@@ -587,6 +587,21 @@ func WithExpectLastSequencePerSubject(seq uint64) PublishOpt {
}
}
// WithExpectLastSequenceForSubject sets the sequence and subject for which the
// last sequence number should be checked. If the last message on a subject
// has a different sequence number server will reject the message and publish
// will fail.
func WithExpectLastSequenceForSubject(seq uint64, subject string) PublishOpt {
return func(opts *pubOpts) error {
if subject == "" {
return fmt.Errorf("%w: subject cannot be empty", ErrInvalidOption)
}
opts.lastSubjectSeq = &seq
opts.lastSubject = subject
return nil
}
}
// WithExpectLastMsgID sets the expected message ID the last message on a stream
// should have. If the last message has a different message ID server will
// reject the message and publish will fail.

View File

@@ -191,6 +191,12 @@ const (
// [WithExpectLastSequencePerSubject] option.
ExpectedLastSubjSeqHeader = "Nats-Expected-Last-Subject-Sequence"
// ExpectedLastSubjSeqSubjHeader contains the subject for which the
// expected last sequence number is set. This is used together with
// [ExpectedLastSubjSeqHeader] to apply optimistic concurrency control at
// subject level. Server will reject the message if it is not the case.
ExpectedLastSubjSeqSubjHeader = "Nats-Expected-Last-Subject-Sequence-Subject"
// ExpectedLastMsgIDHeader contains the expected last message ID on the
// subject and can be used to apply optimistic concurrency control at
// stream level. Server will reject the message if it is not the case.

View File

@@ -47,7 +47,8 @@ type (
lastMsgID string // Expected last msgId
stream string // Expected stream name
lastSeq *uint64 // Expected last sequence
lastSubjectSeq *uint64 // Expected last sequence per subject
lastSubjectSeq *uint64 // Expected last sequence for subject
lastSubject string // Expected subject for last sequence
ttl time.Duration // Message TTL
// Publish retries for NoResponders err.
@@ -195,6 +196,10 @@ func (js *jetStream) PublishMsg(ctx context.Context, m *nats.Msg, opts ...Publis
if o.lastSubjectSeq != nil {
m.Header.Set(ExpectedLastSubjSeqHeader, strconv.FormatUint(*o.lastSubjectSeq, 10))
}
if o.lastSubject != "" {
m.Header.Set(ExpectedLastSubjSeqSubjHeader, o.lastSubject)
m.Header.Set(ExpectedLastSubjSeqHeader, strconv.FormatUint(*o.lastSubjectSeq, 10))
}
if o.ttl > 0 {
m.Header.Set(MsgTTLHeader, o.ttl.String())
}
@@ -281,6 +286,10 @@ func (js *jetStream) PublishMsgAsync(m *nats.Msg, opts ...PublishOpt) (PubAckFut
if o.lastSubjectSeq != nil {
m.Header.Set(ExpectedLastSubjSeqHeader, strconv.FormatUint(*o.lastSubjectSeq, 10))
}
if o.lastSubject != "" {
m.Header.Set(ExpectedLastSubjSeqSubjHeader, o.lastSubject)
m.Header.Set(ExpectedLastSubjSeqHeader, strconv.FormatUint(*o.lastSubjectSeq, 10))
}
if o.ttl > 0 {
m.Header.Set(MsgTTLHeader, o.ttl.String())
}

View File

@@ -364,6 +364,89 @@ func TestPublishMsg(t *testing.T) {
},
},
},
{
name: "expect last sequence for subject",
msgs: []publishConfig{
{
msg: &nats.Msg{
Data: []byte("msg 1"),
Subject: "FOO.1",
},
expectedAck: jetstream.PubAck{
Stream: "foo",
Sequence: 1,
},
},
{
msg: &nats.Msg{
Data: []byte("msg 2"),
Subject: "FOO.2",
},
opts: []jetstream.PublishOpt{},
expectedAck: jetstream.PubAck{
Stream: "foo",
Sequence: 2,
},
},
{
msg: &nats.Msg{
Data: []byte("msg 3"),
Subject: "FOO.1",
},
opts: []jetstream.PublishOpt{jetstream.WithExpectLastSequenceForSubject(2, "FOO.2")},
expectedAck: jetstream.PubAck{
Stream: "foo",
Sequence: 3,
},
expectedHeaders: nats.Header{
"Nats-Expected-Last-Subject-Sequence": []string{"2"},
"Nats-Expected-Last-Subject-Sequence-Subject": []string{"FOO.2"},
},
},
},
},
{
name: "invalid last sequence for subject",
msgs: []publishConfig{
{
msg: &nats.Msg{
Data: []byte("msg 1"),
Subject: "FOO.1",
},
expectedAck: jetstream.PubAck{
Stream: "foo",
Sequence: 1,
},
},
{
msg: &nats.Msg{
Data: []byte("msg 2"),
Subject: "FOO.2",
},
opts: []jetstream.PublishOpt{},
expectedAck: jetstream.PubAck{
Stream: "foo",
Sequence: 2,
},
},
{
msg: &nats.Msg{
Data: []byte("msg 3"),
Subject: "FOO.1",
},
opts: []jetstream.PublishOpt{jetstream.WithExpectLastSequenceForSubject(123, "FOO.2")},
withError: func(t *testing.T, err error) {
var apiErr *jetstream.APIError
if ok := errors.As(err, &apiErr); !ok {
t.Fatalf("Expected API error; got: %v", err)
}
if apiErr.ErrorCode != 10071 {
t.Fatalf("Expected error code: 10071; got: %d", apiErr.ErrorCode)
}
},
},
},
},
{
name: "expect stream header",
msgs: []publishConfig{
@@ -548,7 +631,7 @@ func TestPublishMsg(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}, MaxMsgSize: 64})
_, err = js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}, MaxMsgSize: 128})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -1185,6 +1268,89 @@ func TestPublishMsgAsync(t *testing.T) {
},
},
},
{
name: "expect last sequence for subject",
msgs: []publishConfig{
{
msg: &nats.Msg{
Data: []byte("msg 1"),
Subject: "FOO.1",
},
expectedAck: jetstream.PubAck{
Stream: "foo",
Sequence: 1,
},
},
{
msg: &nats.Msg{
Data: []byte("msg 2"),
Subject: "FOO.2",
},
opts: []jetstream.PublishOpt{},
expectedAck: jetstream.PubAck{
Stream: "foo",
Sequence: 2,
},
},
{
msg: &nats.Msg{
Data: []byte("msg 3"),
Subject: "FOO.1",
},
opts: []jetstream.PublishOpt{jetstream.WithExpectLastSequenceForSubject(2, "FOO.2")},
expectedAck: jetstream.PubAck{
Stream: "foo",
Sequence: 3,
},
expectedHeaders: nats.Header{
"Nats-Expected-Last-Subject-Sequence": []string{"2"},
"Nats-Expected-Last-Subject-Sequence-Subject": []string{"FOO.2"},
},
},
},
},
{
name: "invalid last sequence for subject",
msgs: []publishConfig{
{
msg: &nats.Msg{
Data: []byte("msg 1"),
Subject: "FOO.1",
},
expectedAck: jetstream.PubAck{
Stream: "foo",
Sequence: 1,
},
},
{
msg: &nats.Msg{
Data: []byte("msg 2"),
Subject: "FOO.2",
},
opts: []jetstream.PublishOpt{},
expectedAck: jetstream.PubAck{
Stream: "foo",
Sequence: 2,
},
},
{
msg: &nats.Msg{
Data: []byte("msg 3"),
Subject: "FOO.1",
},
opts: []jetstream.PublishOpt{jetstream.WithExpectLastSequenceForSubject(123, "FOO.2")},
withAckError: func(t *testing.T, err error) {
var apiErr *jetstream.APIError
if ok := errors.As(err, &apiErr); !ok {
t.Fatalf("Expected API error; got: %v", err)
}
if apiErr.ErrorCode != 10071 {
t.Fatalf("Expected error code: 10071; got: %d", apiErr.ErrorCode)
}
},
},
},
},
{
name: "expect stream header",
msgs: []publishConfig{
@@ -1382,7 +1548,7 @@ func TestPublishMsgAsync(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}, MaxMsgSize: 64})
_, err = js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}, MaxMsgSize: 128})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}