From 4354bd988a3253577738d33889802b1e68e65c95 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Thu, 20 Jul 2023 11:56:34 +0200 Subject: [PATCH] Move new JetStream API docs to top level README (#1352) --- README.md | 98 +++++++++++++-------------------------------- legacy_jetstream.md | 83 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 110 insertions(+), 71 deletions(-) create mode 100644 legacy_jetstream.md diff --git a/README.md b/README.md index ad839cb..bef056c 100644 --- a/README.md +++ b/README.md @@ -90,91 +90,47 @@ nc.Drain() nc.Close() ``` -## JetStream Basic Usage +## JetStream -> __NOTE__ -> -> We encourage you to try out a new, simplified version on JetStream API. -> The new API is currently in preview and is available under `jetstream` package. -> -> You can find more information on the new API [here](https://github.com/nats-io/nats.go/blob/main/jetstream/README.md) +JetStream is the built-in NATS persistence system. `nats.go` provides a built-in +API enabling both managing JetStream assets as well as publishing/consuming +persistent messages. + +### Basic usage ```go -import "github.com/nats-io/nats.go" - -// Connect to NATS +// connect to nats server nc, _ := nats.Connect(nats.DefaultURL) -// Create JetStream Context -js, _ := nc.JetStream(nats.PublishAsyncMaxPending(256)) +// create jetstream context from nats connection +js, _ := jetstream.New(nc) -// Simple Stream Publisher -js.Publish("ORDERS.scratch", []byte("hello")) +ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) +defer cancel() -// Simple Async Stream Publisher -for i := 0; i < 500; i++ { - js.PublishAsync("ORDERS.scratch", []byte("hello")) -} -select { -case <-js.PublishAsyncComplete(): -case <-time.After(5 * time.Second): - fmt.Println("Did not resolve in time") -} +// get existing stream handle +stream, _ := js.Stream(ctx, "foo") -// Simple Async Ephemeral Consumer -js.Subscribe("ORDERS.*", func(m *nats.Msg) { - fmt.Printf("Received a JetStream message: %s\n", string(m.Data)) +// retrieve consumer handle from a stream +cons, _ := stream.Consumer(ctx, "cons") + +// consume messages from the consumer in callback +cc, _ := cons.Consume(func(msg jetstream.Msg) { + fmt.Println("Received jetstream message: ", string(msg.Data())) + msg.Ack() }) - -// Simple Sync Durable Consumer (optional SubOpts at the end) -sub, err := js.SubscribeSync("ORDERS.*", nats.Durable("MONITOR"), nats.MaxDeliver(3)) -m, err := sub.NextMsg(timeout) - -// Simple Pull Consumer -sub, err := js.PullSubscribe("ORDERS.*", "MONITOR") -msgs, err := sub.Fetch(10) - -// Unsubscribe -sub.Unsubscribe() - -// Drain -sub.Drain() +defer cc.Stop() ``` -## JetStream Basic Management +To find more information on `nats.go` JetStream API, visit +[`jetstream/README.md`](jetstream/README.md) -```go -import "github.com/nats-io/nats.go" +> The current JetStream API replaces the [legacy JetStream API](legacy_jetstream.md) -// Connect to NATS -nc, _ := nats.Connect(nats.DefaultURL) +## Service API -// Create JetStream Context -js, _ := nc.JetStream() - -// Create a Stream -js.AddStream(&nats.StreamConfig{ - Name: "ORDERS", - Subjects: []string{"ORDERS.*"}, -}) - -// Update a Stream -js.UpdateStream(&nats.StreamConfig{ - Name: "ORDERS", - MaxBytes: 8, -}) - -// Create a Consumer -js.AddConsumer("ORDERS", &nats.ConsumerConfig{ - Durable: "MONITOR", -}) - -// Delete Consumer -js.DeleteConsumer("ORDERS", "MONITOR") - -// Delete Stream -js.DeleteStream("ORDERS") -``` +The service API (`micro`) allows you to [easily build NATS services](micro/README.md) The +services API is currently in beta release. ## Encoded Connections diff --git a/legacy_jetstream.md b/legacy_jetstream.md new file mode 100644 index 0000000..43e1c73 --- /dev/null +++ b/legacy_jetstream.md @@ -0,0 +1,83 @@ +# Legacy JetStream API + +This is a documentation for the legacy JetStream API. A README for the current +API can be found [here](jetstream/README.md) + +## JetStream Basic Usage + +```go +import "github.com/nats-io/nats.go" + +// Connect to NATS +nc, _ := nats.Connect(nats.DefaultURL) + +// Create JetStream Context +js, _ := nc.JetStream(nats.PublishAsyncMaxPending(256)) + +// Simple Stream Publisher +js.Publish("ORDERS.scratch", []byte("hello")) + +// Simple Async Stream Publisher +for i := 0; i < 500; i++ { + js.PublishAsync("ORDERS.scratch", []byte("hello")) +} +select { +case <-js.PublishAsyncComplete(): +case <-time.After(5 * time.Second): + fmt.Println("Did not resolve in time") +} + +// Simple Async Ephemeral Consumer +js.Subscribe("ORDERS.*", func(m *nats.Msg) { + fmt.Printf("Received a JetStream message: %s\n", string(m.Data)) +}) + +// Simple Sync Durable Consumer (optional SubOpts at the end) +sub, err := js.SubscribeSync("ORDERS.*", nats.Durable("MONITOR"), nats.MaxDeliver(3)) +m, err := sub.NextMsg(timeout) + +// Simple Pull Consumer +sub, err := js.PullSubscribe("ORDERS.*", "MONITOR") +msgs, err := sub.Fetch(10) + +// Unsubscribe +sub.Unsubscribe() + +// Drain +sub.Drain() +``` + +## JetStream Basic Management + +```go +import "github.com/nats-io/nats.go" + +// Connect to NATS +nc, _ := nats.Connect(nats.DefaultURL) + +// Create JetStream Context +js, _ := nc.JetStream() + +// Create a Stream +js.AddStream(&nats.StreamConfig{ + Name: "ORDERS", + Subjects: []string{"ORDERS.*"}, +}) + +// Update a Stream +js.UpdateStream(&nats.StreamConfig{ + Name: "ORDERS", + MaxBytes: 8, +}) + +// Create a Consumer +js.AddConsumer("ORDERS", &nats.ConsumerConfig{ + Durable: "MONITOR", +}) + +// Delete Consumer +js.DeleteConsumer("ORDERS", "MONITOR") + +// Delete Stream +js.DeleteStream("ORDERS") +```