Files
nats.go/examples/nats-bench/main.go
Piotr Piotrowski b868e2a2d6 Update dates in licence headers (#1445)
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
2023-10-17 18:44:27 +02:00

198 lines
4.7 KiB
Go

// Copyright 2015-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"flag"
"fmt"
"log"
"os"
"sync"
"time"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/bench"
)
// Some sane defaults
const (
DefaultNumMsgs = 100000
DefaultNumPubs = 1
DefaultNumSubs = 0
DefaultMessageSize = 128
)
func usage() {
log.Printf("Usage: nats-bench [-s server (%s)] [--tls] [-np NUM_PUBLISHERS] [-ns NUM_SUBSCRIBERS] [-n NUM_MSGS] [-ms MESSAGE_SIZE] [-csv csvfile] [-creds file] [-nkey file] <subject>\n", nats.DefaultURL)
flag.PrintDefaults()
}
func showUsageAndExit(exitcode int) {
usage()
os.Exit(exitcode)
}
var benchmark *bench.Benchmark
func main() {
var urls = flag.String("s", nats.DefaultURL, "The nats server URLs (separated by comma)")
var tls = flag.Bool("tls", false, "Use TLS Secure Connection")
var numPubs = flag.Int("np", DefaultNumPubs, "Number of Concurrent Publishers")
var numSubs = flag.Int("ns", DefaultNumSubs, "Number of Concurrent Subscribers")
var numMsgs = flag.Int("n", DefaultNumMsgs, "Number of Messages to Publish")
var msgSize = flag.Int("ms", DefaultMessageSize, "Size of the message.")
var csvFile = flag.String("csv", "", "Save bench data to csv file")
var userCreds = flag.String("creds", "", "User Credentials File")
var nkeyFile = flag.String("nkey", "", "NKey Seed File")
var showHelp = flag.Bool("h", false, "Show help message")
log.SetFlags(0)
flag.Usage = usage
flag.Parse()
if *showHelp {
showUsageAndExit(0)
}
args := flag.Args()
if len(args) != 1 {
showUsageAndExit(1)
}
if *numMsgs <= 0 {
log.Fatal("Number of messages should be greater than zero.")
}
// Connect Options.
opts := []nats.Option{nats.Name("NATS Benchmark")}
if *userCreds != "" && *nkeyFile != "" {
log.Fatal("specify -seed or -creds")
}
// Use UserCredentials
if *userCreds != "" {
opts = append(opts, nats.UserCredentials(*userCreds))
}
// Use Nkey authentication.
if *nkeyFile != "" {
opt, err := nats.NkeyOptionFromSeed(*nkeyFile)
if err != nil {
log.Fatal(err)
}
opts = append(opts, opt)
}
// Use TLS specified
if *tls {
opts = append(opts, nats.Secure(nil))
}
benchmark = bench.NewBenchmark("NATS", *numSubs, *numPubs)
var startwg sync.WaitGroup
var donewg sync.WaitGroup
donewg.Add(*numPubs + *numSubs)
// Run Subscribers first
startwg.Add(*numSubs)
for i := 0; i < *numSubs; i++ {
nc, err := nats.Connect(*urls, opts...)
if err != nil {
log.Fatalf("Can't connect: %v\n", err)
}
defer nc.Close()
go runSubscriber(nc, &startwg, &donewg, *numMsgs, *msgSize)
}
startwg.Wait()
// Now Publishers
startwg.Add(*numPubs)
pubCounts := bench.MsgsPerClient(*numMsgs, *numPubs)
for i := 0; i < *numPubs; i++ {
nc, err := nats.Connect(*urls, opts...)
if err != nil {
log.Fatalf("Can't connect: %v\n", err)
}
defer nc.Close()
go runPublisher(nc, &startwg, &donewg, pubCounts[i], *msgSize)
}
log.Printf("Starting benchmark [msgs=%d, msgsize=%d, pubs=%d, subs=%d]\n", *numMsgs, *msgSize, *numPubs, *numSubs)
startwg.Wait()
donewg.Wait()
benchmark.Close()
fmt.Print(benchmark.Report())
if len(*csvFile) > 0 {
csv := benchmark.CSV()
os.WriteFile(*csvFile, []byte(csv), 0644)
fmt.Printf("Saved metric data in csv file %s\n", *csvFile)
}
}
func runPublisher(nc *nats.Conn, startwg, donewg *sync.WaitGroup, numMsgs int, msgSize int) {
startwg.Done()
args := flag.Args()
subj := args[0]
var msg []byte
if msgSize > 0 {
msg = make([]byte, msgSize)
}
start := time.Now()
for i := 0; i < numMsgs; i++ {
nc.Publish(subj, msg)
}
nc.Flush()
benchmark.AddPubSample(bench.NewSample(numMsgs, msgSize, start, time.Now(), nc))
donewg.Done()
}
func runSubscriber(nc *nats.Conn, startwg, donewg *sync.WaitGroup, numMsgs int, msgSize int) {
args := flag.Args()
subj := args[0]
received := 0
ch := make(chan time.Time, 2)
sub, _ := nc.Subscribe(subj, func(msg *nats.Msg) {
received++
if received == 1 {
ch <- time.Now()
}
if received >= numMsgs {
ch <- time.Now()
}
})
sub.SetPendingLimits(-1, -1)
nc.Flush()
startwg.Done()
start := <-ch
end := <-ch
benchmark.AddSubSample(bench.NewSample(numMsgs, msgSize, start, end, nc))
nc.Close()
donewg.Done()
}