[ADDED] Enable using non-kv streams as KV sources (#1960)

Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
This commit is contained in:
Piotr Piotrowski
2025-12-17 13:32:26 +01:00
committed by GitHub
parent 7bb976de86
commit 34d93ec001
3 changed files with 198 additions and 80 deletions

View File

@@ -246,6 +246,11 @@ type (
Mirror *StreamSource `json:"mirror,omitempty"`
// Sources defines the configuration for sources of a KeyValue store.
// If no subject transforms are defined, it is assumed that a source is
// also a KV store and subject transforms will be set to correctly map
// keys from the source KV to the current one. If subject transforms are
// defined, they will be used as is. This allows using non-kv streams as
// sources.
Sources []*StreamSource `json:"sources,omitempty"`
// Compression sets the underlying stream compression.
@@ -471,7 +476,6 @@ const (
kvSubjectsTmpl = "$KV.%s.>"
kvSubjectsPreTmpl = "$KV.%s."
kvSubjectsPreDomainTmpl = "%s.$KV.%s."
kvNoPending = "0"
)
const (
@@ -685,8 +689,14 @@ func (js *jetStream) prepareKeyValueConfig(ctx context.Context, cfg KeyValueConf
scfg.Mirror = m
scfg.MirrorDirect = true
} else if len(cfg.Sources) > 0 {
// For now we do not allow direct subjects for sources. If that is desired a user could use stream API directly.
for _, ss := range cfg.Sources {
// if subject transforms are already set, then use as is.
// this allows for full control of the source, e.g. using non-KV streams.
// Note that in this case, the Name is not modified and full stream name must be provided.
if len(ss.SubjectTransforms) > 0 {
scfg.Sources = append(scfg.Sources, ss)
continue
}
var sourceBucketName string
if strings.HasPrefix(ss.Name, kvBucketNamePre) {
sourceBucketName = ss.Name[len(kvBucketNamePre):]

View File

@@ -50,6 +50,11 @@ func TestKeyValueBasics(t *testing.T) {
if r != 1 {
t.Fatalf("Expected 1 for the revision, got %d", r)
}
// put, invalid key
_, err = kv.Put(ctx, ".invalid", []byte("value"))
expectErr(t, err, jetstream.ErrInvalidKey)
// Simple Get
e, err := kv.Get(ctx, "name")
expectOk(t, err)
@@ -60,6 +65,10 @@ func TestKeyValueBasics(t *testing.T) {
t.Fatalf("Expected 1 for the revision, got %d", e.Revision())
}
// get, invalid key
_, err = kv.Get(ctx, ".invalid")
expectErr(t, err, jetstream.ErrInvalidKey)
// Delete
err = kv.Delete(ctx, "name")
expectOk(t, err)
@@ -75,6 +84,10 @@ func TestKeyValueBasics(t *testing.T) {
err = kv.Delete(ctx, "name", jetstream.LastRevision(3))
expectOk(t, err)
// delete, invalid key
err = kv.Delete(ctx, ".invalid")
expectErr(t, err, jetstream.ErrInvalidKey)
// Conditional Updates.
r, err = kv.Update(ctx, "name", []byte("rip"), 4)
expectOk(t, err)
@@ -169,6 +182,10 @@ func TestCreateKeyValue(t *testing.T) {
// assert that we're backwards compatible
expectErr(t, err, jetstream.ErrStreamNameAlreadyInUse)
// invalid configs
_, err = js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "NEW", History: 200})
expectErr(t, err, jetstream.ErrHistoryTooLarge)
}
func TestUpdateKeyValue(t *testing.T) {
@@ -2113,6 +2130,9 @@ func TestKeyValueListKeysDuplicates(t *testing.T) {
for i := range 5 {
key := fmt.Sprintf("key_%d", i)
if _, err := kv.PutString(ctx, key, "updated"); err != nil {
if errors.Is(err, nats.ErrConnectionClosed) {
return
}
t.Logf("Error updating key %s: %v", key, err)
}
}
@@ -2147,86 +2167,166 @@ func TestKeyValueListKeysDuplicates(t *testing.T) {
}
func TestKeyValueWithSources(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)
t.Run("kv -> kv", func(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)
nc, js := jsClient(t, s)
defer nc.Close()
ctx := context.Background()
nc, js := jsClient(t, s)
defer nc.Close()
ctx := context.Background()
kv1, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: "SOURCE1",
History: 5,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
kv2, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: "SOURCE2",
History: 5,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
kv3, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: "SOURCED",
History: 5,
Sources: []*jetstream.StreamSource{
{Name: "SOURCE1"},
{Name: "SOURCE2"},
},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if _, err := kv1.Put(ctx, "key1", []byte("value1")); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if _, err := kv2.Put(ctx, "key2", []byte("value2")); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
time.Sleep(200 * time.Millisecond)
val, err := kv3.Get(ctx, "key1")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if string(val.Value()) != "value1" {
t.Fatalf("Expected value1, got %s", string(val.Value()))
}
val, err = kv3.Get(ctx, "key2")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if string(val.Value()) != "value2" {
t.Fatalf("Expected value2, got %s", string(val.Value()))
}
stream, err := js.Stream(ctx, "KV_SOURCED")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
info, err := stream.Info(ctx)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(info.Config.Sources) != 2 {
t.Fatalf("Expected 2 sources, got %d", len(info.Config.Sources))
}
for _, src := range info.Config.Sources {
if len(src.SubjectTransforms) != 1 {
t.Fatalf("Expected 1 subject transform, got %d", len(src.SubjectTransforms))
kv1, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: "SOURCE1",
History: 5,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
kv2, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: "SOURCE2",
History: 5,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
kv3, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: "SOURCED",
History: 5,
Sources: []*jetstream.StreamSource{
{Name: "SOURCE1"},
// for the second one, pass the prefix, should still work
{Name: "KV_SOURCE2"},
},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if _, err := kv1.Put(ctx, "key1", []byte("value1")); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if _, err := kv2.Put(ctx, "key2", []byte("value2")); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
time.Sleep(200 * time.Millisecond)
val, err := kv3.Get(ctx, "key1")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if string(val.Value()) != "value1" {
t.Fatalf("Expected value1, got %s", string(val.Value()))
}
val, err = kv3.Get(ctx, "key2")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if string(val.Value()) != "value2" {
t.Fatalf("Expected value2, got %s", string(val.Value()))
}
stream, err := js.Stream(ctx, "KV_SOURCED")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
info, err := stream.Info(ctx)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if len(info.Config.Sources) != 2 {
t.Fatalf("Expected 2 sources, got %d", len(info.Config.Sources))
}
for _, src := range info.Config.Sources {
if len(src.SubjectTransforms) != 1 {
t.Fatalf("Expected 1 subject transform, got %d", len(src.SubjectTransforms))
}
}
})
t.Run("streams -> kv", func(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)
nc, js := jsClient(t, s)
defer nc.Close()
ctx := context.Background()
_, err := js.CreateStream(ctx, jetstream.StreamConfig{
Name: "SOURCE1",
Subjects: []string{"S1.>"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
_, err = js.CreateStream(ctx, jetstream.StreamConfig{
Name: "SOURCE2",
Subjects: []string{"S2.>"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: "SOURCED",
History: 5,
Sources: []*jetstream.StreamSource{
{
Name: "SOURCE1",
SubjectTransforms: []jetstream.SubjectTransformConfig{
{
Source: "S1.>",
Destination: "$KV.SOURCED.>",
},
},
},
{
Name: "SOURCE2",
SubjectTransforms: []jetstream.SubjectTransformConfig{
{
Source: "S2.>",
Destination: "$KV.SOURCED.>",
},
},
},
},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if _, err := js.Publish(ctx, "S1.key1", []byte("value1")); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if _, err := js.Publish(ctx, "S2.key2", []byte("value2")); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
time.Sleep(200 * time.Millisecond)
val, err := kv.Get(ctx, "key1")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if string(val.Value()) != "value1" {
t.Fatalf("Expected value1, got %s", string(val.Value()))
}
val, err = kv.Get(ctx, "key2")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if string(val.Value()) != "value2" {
t.Fatalf("Expected value2, got %s", string(val.Value()))
}
})
}
func TestKeyValueGetRevision(t *testing.T) {
@@ -2265,6 +2365,15 @@ func TestKeyValueGetRevision(t *testing.T) {
if entry.Revision() != 1 {
t.Fatalf("Expected revision 1, got %d", entry.Revision())
}
if err := kv.Delete(ctx, "key"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
_, err = kv.GetRevision(ctx, "key", 3)
if !errors.Is(err, jetstream.ErrKeyNotFound) {
t.Fatalf("Expected ErrKeyNotFound, got %v", err)
}
}
func TestKeyValueStatusBytes(t *testing.T) {

1
kv.go
View File

@@ -354,7 +354,6 @@ const (
kvSubjectsTmpl = "$KV.%s.>"
kvSubjectsPreTmpl = "$KV.%s."
kvSubjectsPreDomainTmpl = "%s.$KV.%s."
kvNoPending = "0"
)
// Regex for valid keys and buckets.