mirror of
https://github.com/nats-io/nats.go.git
synced 2025-09-26 20:41:41 +08:00
[CHANGED] Remove no headers support (#1939)
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
This commit is contained in:

committed by
Piotr Piotrowski

parent
ec9b58f895
commit
60f17186ec
3
.gitignore
vendored
3
.gitignore
vendored
@@ -21,6 +21,9 @@ _testmain.go
|
||||
|
||||
*.exe
|
||||
|
||||
# Git backup files
|
||||
*.orig
|
||||
|
||||
# Emacs
|
||||
*~
|
||||
\#*\#
|
||||
|
@@ -124,25 +124,6 @@ func WithGetMsgSubject(subject string) GetMsgOpt {
|
||||
}
|
||||
}
|
||||
|
||||
// WithGetMsgNoHeaders sets whether the message headers should be returned
|
||||
// in the response. If set to true, the headers will not be returned.
|
||||
// This is useful for performance reasons when headers are not needed.
|
||||
func WithGetMsgNoHeaders() GetMsgOpt {
|
||||
return func(req *apiMsgGetRequest) error {
|
||||
req.NoHeaders = true
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithGetLastForSubjectNoHeaders sets whether the message headers should be
|
||||
// returned in the response for the last message on a subject.
|
||||
func WithGetLastForSubjectNoHeaders() GetLastForSubjectOpt {
|
||||
return func(req *apiMsgGetRequest) error {
|
||||
req.NoHeaders = true
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// PullMaxMessages limits the number of messages to be buffered in the client.
|
||||
// If not provided, a default of 500 messages will be used.
|
||||
// This option is exclusive with PullMaxBytes.
|
||||
|
@@ -225,10 +225,9 @@ type (
|
||||
GetLastForSubjectOpt func(*apiMsgGetRequest) error
|
||||
|
||||
apiMsgGetRequest struct {
|
||||
Seq uint64 `json:"seq,omitempty"`
|
||||
LastFor string `json:"last_by_subj,omitempty"`
|
||||
NextFor string `json:"next_by_subj,omitempty"`
|
||||
NoHeaders bool `json:"no_hdr,omitempty"`
|
||||
Seq uint64 `json:"seq,omitempty"`
|
||||
LastFor string `json:"last_by_subj,omitempty"`
|
||||
NextFor string `json:"next_by_subj,omitempty"`
|
||||
}
|
||||
|
||||
// apiMsgGetResponse is the response for a Stream get request.
|
||||
@@ -555,14 +554,14 @@ func (s *stream) getMsg(ctx context.Context, mreq *apiMsgGetRequest) (*RawStream
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return convertDirectGetMsgResponseToMsg(r.msg, mreq.NoHeaders)
|
||||
return convertDirectGetMsgResponseToMsg(r.msg)
|
||||
}
|
||||
gmSubj = fmt.Sprintf(apiDirectMsgGetT, s.name)
|
||||
r, err := s.js.apiRequest(ctx, gmSubj, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return convertDirectGetMsgResponseToMsg(r.msg, mreq.NoHeaders)
|
||||
return convertDirectGetMsgResponseToMsg(r.msg)
|
||||
}
|
||||
req, err := json.Marshal(mreq)
|
||||
if err != nil {
|
||||
@@ -602,7 +601,7 @@ func (s *stream) getMsg(ctx context.Context, mreq *apiMsgGetRequest) (*RawStream
|
||||
}, nil
|
||||
}
|
||||
|
||||
func convertDirectGetMsgResponseToMsg(r *nats.Msg, noHeaders bool) (*RawStreamMsg, error) {
|
||||
func convertDirectGetMsgResponseToMsg(r *nats.Msg) (*RawStreamMsg, error) {
|
||||
// Check for 404/408. We would get a no-payload message and a "Status" header
|
||||
if len(r.Data) == 0 {
|
||||
val := r.Header.Get(statusHdr)
|
||||
@@ -619,11 +618,7 @@ func convertDirectGetMsgResponseToMsg(r *nats.Msg, noHeaders bool) (*RawStreamMs
|
||||
}
|
||||
}
|
||||
}
|
||||
if noHeaders {
|
||||
return &RawStreamMsg{
|
||||
Data: r.Data,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Check for headers that give us the required information to
|
||||
// reconstruct the message.
|
||||
if len(r.Header) == 0 {
|
||||
|
@@ -1032,104 +1032,6 @@ func TestGetMsg(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetMsgNoHeaders(t *testing.T) {
|
||||
srv := RunBasicJetStreamServer()
|
||||
defer shutdownJSServerAndRemoveStorage(t, srv)
|
||||
nc, err := nats.Connect(srv.ClientURL())
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
js, err := jetstream.New(nc)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
sNonDirect, err := js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "NON_DIRECT", Subjects: []string{"FOO"}})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
sDirect, err := js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "DIRECT", Subjects: []string{"BAR"}})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
for _, s := range []jetstream.Stream{sNonDirect, sDirect} {
|
||||
if _, err := js.PublishMsg(context.Background(), &nats.Msg{
|
||||
Data: []byte("msg without headers"),
|
||||
Subject: s.CachedInfo().Config.Subjects[0],
|
||||
Header: nats.Header{
|
||||
"X-Nats-Test-Data": {"test_data"},
|
||||
"X-Nats-Key": {"123"},
|
||||
},
|
||||
}); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
msg, err := s.GetMsg(context.Background(), 1, jetstream.WithGetMsgNoHeaders())
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if string(msg.Data) != "msg without headers" {
|
||||
t.Fatalf("Invalid message data; want: msg without headers; got: %s", string(msg.Data))
|
||||
}
|
||||
if len(msg.Header) > 0 {
|
||||
t.Fatalf("Expected no headers; got: %v", msg.Header)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetLastMsgForSubjectNoHeaders(t *testing.T) {
|
||||
srv := RunBasicJetStreamServer()
|
||||
defer shutdownJSServerAndRemoveStorage(t, srv)
|
||||
nc, err := nats.Connect(srv.ClientURL())
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
js, err := jetstream.New(nc)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
sNonDirect, err := js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "NON_DIRECT", Subjects: []string{"FOO"}})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
sDirect, err := js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "DIRECT", Subjects: []string{"BAR"}, AllowDirect: true})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
for _, s := range []jetstream.Stream{sNonDirect, sDirect} {
|
||||
t.Run("stream "+s.CachedInfo().Config.Name, func(t *testing.T) {
|
||||
if _, err := js.PublishMsg(context.Background(), &nats.Msg{
|
||||
Data: []byte("msg without headers"),
|
||||
Subject: s.CachedInfo().Config.Subjects[0],
|
||||
Header: nats.Header{
|
||||
"X-Nats-Test-Data": {"test_data"},
|
||||
"X-Nats-Key": {"123"},
|
||||
},
|
||||
}); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
msg, err := s.GetLastMsgForSubject(context.Background(), s.CachedInfo().Config.Subjects[0], jetstream.WithGetLastForSubjectNoHeaders())
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if string(msg.Data) != "msg without headers" {
|
||||
t.Fatalf("Invalid message data; want: msg without headers; got: %s", string(msg.Data))
|
||||
}
|
||||
if len(msg.Header) > 0 {
|
||||
t.Fatalf("Expected no headers; got: %v", msg.Header)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetLastMsgForSubject(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
Reference in New Issue
Block a user