mirror of
https://github.com/nats-io/nats.go.git
synced 2025-09-26 20:41:41 +08:00
[IMPROVED] Add test veryfying if mirror can be updated to regular stream (#1934)
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
This commit is contained in:

committed by
Piotr Piotrowski

parent
3416ed073b
commit
ec9b58f895
@@ -2403,3 +2403,78 @@ func TestCreateOrUpdateStreamCrossDomains(t *testing.T) {
|
||||
t.Fatalf("Expected subject to be updated to 'bar', got %q", info2.Config.Subjects[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestPromoteMirrorToStream(t *testing.T) {
|
||||
srv := RunBasicJetStreamServer()
|
||||
defer shutdownJSServerAndRemoveStorage(t, srv)
|
||||
nc, err := nats.Connect(srv.ClientURL())
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
js, err := jetstream.New(nc)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
// Create source stream
|
||||
_, err = js.CreateStream(ctx, jetstream.StreamConfig{
|
||||
Name: "SOURCE",
|
||||
Subjects: []string{"foo"},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// Publish some messages to source
|
||||
for range 10 {
|
||||
if _, err := js.Publish(ctx, "foo", []byte("hello")); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Create mirror stream
|
||||
mirror, err := js.CreateStream(ctx, jetstream.StreamConfig{
|
||||
Name: "MIRROR",
|
||||
Mirror: &jetstream.StreamSource{Name: "SOURCE"},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// Wait for mirror to catch up
|
||||
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
|
||||
info, err := mirror.Info(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if info.State.Msgs != 10 {
|
||||
return fmt.Errorf("not caught up")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Remove mirror configuration
|
||||
updatedMirror, err := js.UpdateStream(ctx, jetstream.StreamConfig{
|
||||
Name: "MIRROR",
|
||||
Subjects: []string{"bar"},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
info, err := updatedMirror.Info(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if info.Config.Mirror != nil {
|
||||
t.Fatal("Expected mirror to be nil after promote")
|
||||
}
|
||||
|
||||
if info.Config.Subjects[0] != "bar" {
|
||||
t.Fatalf("Expected subject to be 'bar', got %q", info.Config.Subjects[0])
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user