# Add package :
- config server, cluster, logger, user & permission
- config client & builder
- interface for managing server, cluster and client
- test for nats cluster / client producer & subscriber

# Update packages : 
- pkg errors : constant to add package nats
- pkg logger : add function to read options
- pkg ioutils : add function to check / create file & path

# Dependancies : 
- Bump dependancies
- Add dependancies : 
  - github.com/nats-io/nats-server/v2
  - github.com/nats-io/nats.go
This commit is contained in:
Nicolas JUHEL
2021-05-06 12:56:53 +02:00
committed by GitHub
parent 115485e8a3
commit efd2784ac9
10 changed files with 2905 additions and 38 deletions

View File

@@ -40,14 +40,18 @@ const (
MinPkgMail = 1100 MinPkgMail = 1100
MinPkgMailer = 1200 MinPkgMailer = 1200
MinPkgNetwork = 1300 MinPkgNetwork = 1300
MinPkgNutsDB = 1400 MinPkgNats = 1400
MinPkgOAuth = 1500 MinPkgNutsDB = 1500
MinPkgAws = 1600 MinPkgOAuth = 1600
MinPkgRouter = 1700 MinPkgAws = 1700
MinPkgSemaphore = 1800 MinPkgRouter = 1800
MinPkgSMTP = 1900 MinPkgSemaphore = 1900
MinPkgStatic = 2000 MinPkgSMTP = 2000
MinPkgVersion = 2100 MinPkgStatic = 2100
MinPkgVersion = 2200
MIN_AVAILABLE = 4000 MinAvailable = 4000
// MIN_AVAILABLE @Deprecated use MinAvailable constant
MIN_AVAILABLE = MinAvailable
) )

38
go.mod
View File

@@ -10,17 +10,18 @@ require (
github.com/PuerkitoBio/goquery v1.6.1 // indirect github.com/PuerkitoBio/goquery v1.6.1 // indirect
github.com/StackExchange/wmi v0.0.0-20210224194228-fe8f1750fd46 // indirect github.com/StackExchange/wmi v0.0.0-20210224194228-fe8f1750fd46 // indirect
github.com/VictoriaMetrics/metrics v1.17.2 // indirect github.com/VictoriaMetrics/metrics v1.17.2 // indirect
github.com/VividCortex/ewma v1.2.0 // indirect
github.com/andybalholm/cascadia v1.2.0 // indirect github.com/andybalholm/cascadia v1.2.0 // indirect
github.com/aokoli/goutils v1.1.1 // indirect github.com/aokoli/goutils v1.1.1 // indirect
github.com/armon/go-metrics v0.3.7 // indirect github.com/armon/go-metrics v0.3.8 // indirect
github.com/aws/aws-sdk-go-v2 v1.3.3 github.com/aws/aws-sdk-go-v2 v1.3.4
github.com/aws/aws-sdk-go-v2/config v1.1.6 github.com/aws/aws-sdk-go-v2/config v1.1.6
github.com/aws/aws-sdk-go-v2/credentials v1.1.6 github.com/aws/aws-sdk-go-v2/credentials v1.1.6
github.com/aws/aws-sdk-go-v2/service/iam v1.3.1 github.com/aws/aws-sdk-go-v2/service/iam v1.3.1
github.com/aws/aws-sdk-go-v2/service/s3 v1.5.0 github.com/aws/aws-sdk-go-v2/service/s3 v1.5.0
github.com/c-bata/go-prompt v0.2.6 github.com/c-bata/go-prompt v0.2.6
github.com/cockroachdb/errors v1.8.4 // indirect github.com/cockroachdb/errors v1.8.4 // indirect
github.com/cockroachdb/pebble v0.0.0-20210414141038-bee0c60e96bc // indirect github.com/cockroachdb/pebble v0.0.0-20210503173641-1387689d3d7c // indirect
github.com/cockroachdb/redact v1.0.9 // indirect github.com/cockroachdb/redact v1.0.9 // indirect
github.com/fatih/color v1.10.0 github.com/fatih/color v1.10.0
github.com/fxamacker/cbor/v2 v2.2.0 github.com/fxamacker/cbor/v2 v2.2.0
@@ -42,17 +43,17 @@ require (
github.com/hashicorp/go-immutable-radix v1.3.0 // indirect github.com/hashicorp/go-immutable-radix v1.3.0 // indirect
github.com/hashicorp/go-msgpack v1.1.5 // indirect github.com/hashicorp/go-msgpack v1.1.5 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-retryablehttp v0.6.8 github.com/hashicorp/go-retryablehttp v0.7.0
github.com/hashicorp/go-sockaddr v1.0.2 // indirect github.com/hashicorp/go-sockaddr v1.0.2 // indirect
github.com/hashicorp/go-uuid v1.0.2 github.com/hashicorp/go-uuid v1.0.2
github.com/hashicorp/go-version v1.3.0 github.com/hashicorp/go-version v1.3.0
github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/memberlist v0.2.3 // indirect github.com/hashicorp/memberlist v0.2.4 // indirect
github.com/huandu/xstrings v1.3.2 // indirect github.com/huandu/xstrings v1.3.2 // indirect
github.com/imdario/mergo v0.3.12 // indirect github.com/imdario/mergo v0.3.12 // indirect
github.com/jaytaylor/html2text v0.0.0-20200412013138-3577fbdbcff7 // indirect github.com/jaytaylor/html2text v0.0.0-20200412013138-3577fbdbcff7 // indirect
github.com/json-iterator/go v1.1.10 // indirect github.com/json-iterator/go v1.1.11 // indirect
github.com/klauspost/compress v1.12.1 // indirect github.com/klauspost/compress v1.12.2 // indirect
github.com/kr/pretty v0.2.1 // indirect github.com/kr/pretty v0.2.1 // indirect
github.com/kr/text v0.2.0 // indirect github.com/kr/text v0.2.0 // indirect
github.com/leodido/go-urn v1.2.1 // indirect github.com/leodido/go-urn v1.2.1 // indirect
@@ -60,30 +61,35 @@ require (
github.com/matcornic/hermes/v2 v2.1.0 github.com/matcornic/hermes/v2 v2.1.0
github.com/mattn/go-runewidth v0.0.12 // indirect github.com/mattn/go-runewidth v0.0.12 // indirect
github.com/miekg/dns v1.1.41 // indirect github.com/miekg/dns v1.1.41 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/mitchellh/copystructure v1.1.2 // indirect github.com/mitchellh/copystructure v1.1.2 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.0.2
github.com/nats-io/nats-server/v2 v2.2.2
github.com/nats-io/nats.go v1.11.0
github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/onsi/ginkgo v1.16.1 github.com/onsi/ginkgo v1.16.2
github.com/onsi/gomega v1.11.0 github.com/onsi/gomega v1.11.0
github.com/rivo/uniseg v0.2.0 // indirect github.com/rivo/uniseg v0.2.0 // indirect
github.com/rogpeppe/go-internal v1.8.0 // indirect github.com/rogpeppe/go-internal v1.8.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/shirou/gopsutil v3.21.3+incompatible github.com/shirou/gopsutil v3.21.4+incompatible
github.com/sirupsen/logrus v1.8.1 github.com/sirupsen/logrus v1.8.1
github.com/spf13/jwalterweatherman v1.1.0 github.com/spf13/jwalterweatherman v1.1.0
github.com/ugorji/go v1.2.5 // indirect github.com/ugorji/go v1.2.5 // indirect
github.com/vanng822/go-premailer v1.20.1 // indirect github.com/vanng822/go-premailer v1.20.1 // indirect
github.com/vbauerster/mpb/v5 v5.4.0 github.com/vbauerster/mpb/v5 v5.4.0
github.com/xanzy/go-gitlab v0.48.0 github.com/xanzy/go-gitlab v0.49.0
github.com/xhit/go-simple-mail v2.2.2+incompatible github.com/xhit/go-simple-mail v2.2.2+incompatible
github.com/xujiajun/nutsdb v0.6.0 github.com/xujiajun/nutsdb v0.6.0
github.com/xujiajun/utils v0.0.0-20190123093513-8bf096c4f53b github.com/xujiajun/utils v0.0.0-20190123093513-8bf096c4f53b
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b // indirect golang.org/x/crypto v0.0.0-20210503195802-e9a32991a82e // indirect
golang.org/x/exp v0.0.0-20210417010653-0739314eea07 // indirect golang.org/x/exp v0.0.0-20210503015746-b3083d562e1d // indirect
golang.org/x/net v0.0.0-20210421230115-4e50805a0758 golang.org/x/net v0.0.0-20210505024714-0287a6fb4125
golang.org/x/oauth2 v0.0.0-20210413134643-5e61552d6c78 golang.org/x/oauth2 v0.0.0-20210427180440-81ed05c6b58c
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20210423082822-04245dca01da // indirect golang.org/x/sys v0.0.0-20210503173754-0981d6026fa6 // indirect
golang.org/x/term v0.0.0-20210422114643-f5beecf764ed golang.org/x/term v0.0.0-20210503060354-a79de5458b56
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
google.golang.org/appengine v1.6.7 // indirect google.golang.org/appengine v1.6.7 // indirect
) )

52
ioutils/tools.go Normal file
View File

@@ -0,0 +1,52 @@
/***********************************************************************************************************************
*
* MIT License
*
* Copyright (c) 2021 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
**********************************************************************************************************************/
package ioutils
import (
"errors"
"os"
"path/filepath"
)
func PathCheckCreate(isFile bool, path string, permFile os.FileMode, permDir os.FileMode) error {
if _, err := os.Stat(path); err != nil && !errors.Is(err, os.ErrNotExist) {
return err
}
if !isFile {
return os.MkdirAll(path, permDir)
} else if e := PathCheckCreate(false, filepath.Dir(path), permFile, permDir); e != nil {
return e
} else if hf, err := os.Create(path); err != nil {
return err
} else {
_ = hf.Close()
}
return os.Chmod(path, permFile)
}

View File

@@ -65,20 +65,16 @@ func init() {
// GetLogger return a golang log.logger instance linked with this main logger. // GetLogger return a golang log.logger instance linked with this main logger.
// This function is useful to keep the format, mode, color, output... same as current config. // This function is useful to keep the format, mode, color, output... same as current config.
/* // - msgPrefixPattern a pattern prefix to identify or comment all message passed throw this log.logger instance.
msgPrefixPattern a pattern prefix to identify or comment all message passed throw this log.logger instance // - msgPrefixArgs a list of interface to apply on pattern with a fmt function.
msgPrefixArgs a list of interface to apply on pattern with a fmt function
*/
func GetLogger(lvl Level, logFlags int, msgPrefixPattern string, msgPrefixArgs ...interface{}) *log.Logger { func GetLogger(lvl Level, logFlags int, msgPrefixPattern string, msgPrefixArgs ...interface{}) *log.Logger {
return log.New(GetIOWriter(lvl, msgPrefixPattern, msgPrefixArgs...), "", logFlags) return log.New(GetIOWriter(lvl, msgPrefixPattern, msgPrefixArgs...), "", logFlags)
} }
// GetLogger force the default golang log.logger instance linked with this main logger. // SetStdLogger force the default golang log.logger instance linked with this main logger.
// This function is useful to keep the format, mode, color, output... same as current config. // This function is useful to keep the format, mode, color, output... same as current config.
/* // - msgPrefixPattern a pattern prefix to identify or comment all message passed throw this log.logger instance.
msgPrefixPattern a pattern prefix to identify or comment all message passed throw this log.logger instance // - msgPrefixArgs a list of interface to apply on pattern with a fmt function.
msgPrefixArgs a list of interface to apply on pattern with a fmt function
*/
func SetStdLogger(lvl Level, logFlags int, msgPrefixPattern string, msgPrefixArgs ...interface{}) { func SetStdLogger(lvl Level, logFlags int, msgPrefixPattern string, msgPrefixArgs ...interface{}) {
log.SetOutput(GetIOWriter(lvl, msgPrefixPattern, msgPrefixArgs...)) log.SetOutput(GetIOWriter(lvl, msgPrefixPattern, msgPrefixArgs...))
log.SetPrefix("") log.SetPrefix("")
@@ -95,6 +91,11 @@ func Timestamp(enable bool) {
timestamp = enable timestamp = enable
} }
// IsTimeStamp will return true if timestamp is added or not on log message
func IsTimeStamp() bool {
return timestamp
}
// FileTrace Reconfigure the current logger to add or not the origin file/line of each message. // FileTrace Reconfigure the current logger to add or not the origin file/line of each message.
// This option is apply for all message except info message. // This option is apply for all message except info message.
func FileTrace(enable bool) { func FileTrace(enable bool) {
@@ -102,18 +103,35 @@ func FileTrace(enable bool) {
setViperLogTrace() setViperLogTrace()
} }
// IsFileTrace will return true if trace is added or not on log message
func IsFileTrace() bool {
return filetrace
}
// ModeColor will reconfigure the current logger to use or not color in messages format.
// This apply only for next message and only for TextFormat.
func ModeColor(enable bool) {
modeColor = enable
updateFormatter(nilFormat)
}
// IsModeColor will return true if color is configured on log message
func IsModeColor() bool {
return modeColor
}
// EnableColor Reconfigure the current logger to use color in messages format. // EnableColor Reconfigure the current logger to use color in messages format.
// This apply only for next message and only for TextFormat. // This apply only for next message and only for TextFormat.
// @deprecated use ModeColor(true)
func EnableColor() { func EnableColor() {
modeColor = true ModeColor(true)
updateFormatter(nilFormat)
} }
// DisableColor Reconfigure the current logger to not use color in messages format. // DisableColor Reconfigure the current logger to not use color in messages format.
// This apply only for next message and only for TextFormat. // This apply only for next message and only for TextFormat.
// @deprecated use Color(false)
func DisableColor() { func DisableColor() {
modeColor = false ModeColor(false)
updateFormatter(nilFormat)
} }
// EnableViperLog enable or not the Gin Logger configuration. // EnableViperLog enable or not the Gin Logger configuration.

290
nats/client.go Normal file
View File

@@ -0,0 +1,290 @@
/***********************************************************************************************************************
*
* MIT License
*
* Copyright (c) 2021 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
**********************************************************************************************************************/
package nats
import (
"fmt"
"time"
"github.com/go-playground/validator/v10"
libtls "github.com/nabbar/golib/certificates"
liberr "github.com/nabbar/golib/errors"
natcli "github.com/nats-io/nats.go"
)
type Client struct {
// Url represents a single NATS server url to which the client
// will be connecting. If the Servers option is also set, it
// then becomes the first server in the Servers array.
Url string
// Servers is a configured set of servers which this client
// will use when attempting to connect.
Servers []string
// NoRandomize configures whether we will randomize the
// server pool.
NoRandomize bool
// NoEcho configures whether the server will echo back messages
// that are sent on this connection if we also have matching subscriptions.
// Note this is supported on servers >= version 1.2. Proto 1 or greater.
NoEcho bool
// Name is an optional name label which will be sent to the server
// on CONNECT to identify the client.
Name string
// Verbose signals the server to send an OK ack for commands
// successfully processed by the server.
Verbose bool
// Pedantic signals the server whether it should be doing further
// validation of subjects.
Pedantic bool
// AllowReconnect enables reconnection logic to be used when we
// encounter a disconnect from the current server.
AllowReconnect bool
// MaxReconnect sets the number of reconnect attempts that will be
// tried before giving up. If negative, then it will never give up
// trying to reconnect.
MaxReconnect int
// ReconnectWait sets the time to backoff after attempting a reconnect
// to a server that we were already connected to previously.
ReconnectWait time.Duration
// ReconnectJitter sets the upper bound for a random delay added to
// ReconnectWait during a reconnect when no TLS is used.
// Note that any jitter is capped with ReconnectJitterMax.
ReconnectJitter time.Duration
// ReconnectJitterTLS sets the upper bound for a random delay added to
// ReconnectWait during a reconnect when TLS is used.
// Note that any jitter is capped with ReconnectJitterMax.
ReconnectJitterTLS time.Duration
// Timeout sets the timeout for a Dial operation on a connection.
Timeout time.Duration
// DrainTimeout sets the timeout for a Drain Operation to complete.
DrainTimeout time.Duration
// FlusherTimeout is the maximum time to wait for write operations
// to the underlying connection to complete (including the flusher loop).
FlusherTimeout time.Duration
// PingInterval is the period at which the client will be sending ping
// commands to the server, disabled if 0 or negative.
PingInterval time.Duration
// MaxPingsOut is the maximum number of pending ping commands that can
// be awaiting a response before raising an ErrStaleConnection error.
MaxPingsOut int
// ReconnectBufSize is the size of the backing bufio during reconnect.
// Once this has been exhausted publish operations will return an error.
ReconnectBufSize int
// SubChanLen is the size of the buffered channel used between the socket
// Go routine and the message delivery for SyncSubscriptions.
// NOTE: This does not affect AsyncSubscriptions which are
// dictated by PendingLimits()
SubChanLen int
// User sets the username to be used when connecting to the server.
User string
// Password sets the password to be used when connecting to a server.
Password string
// Token sets the token to be used when connecting to a server.
Token string
// UseOldRequestStyle forces the old method of Requests that utilize
// a new Inbox and a new Subscription for each request.
UseOldRequestStyle bool
// NoCallbacksAfterClientClose allows preventing the invocation of
// callbacks after Close() is called. Client won't receive notifications
// when Close is invoked by user code. Default is to invoke the callbacks.
NoCallbacksAfterClientClose bool
// Secure enables TLS secure connections that skip server
// verification by default. NOT RECOMMENDED.
Secure bool
// TLSConfig is a custom TLS configuration to use for secure
// transports.
TLSConfig libtls.Config
}
func (c Client) Validate() liberr.Error {
val := validator.New()
err := val.Struct(c)
if e, ok := err.(*validator.InvalidValidationError); ok {
return ErrorConfigValidation.ErrorParent(e)
}
out := ErrorConfigValidation.Error(nil)
for _, e := range err.(validator.ValidationErrors) {
//nolint goerr113
out.AddParent(fmt.Errorf("config field '%s' is not validated by constraint '%s'", e.Field(), e.ActualTag()))
}
if out.HasParent() {
return out
}
return nil
}
func (c Client) NewClient(defTls libtls.TLSConfig) (*natcli.Conn, liberr.Error) {
opts := natcli.GetDefaultOptions()
if c.Url != "" {
opts.Url = c.Url
}
if len(c.Servers) > 0 {
opts.Servers = make([]string, 0)
for _, s := range c.Servers {
if s != "" {
opts.Servers = append(opts.Servers, s)
}
}
}
if c.NoRandomize {
opts.NoRandomize = true
}
if c.NoEcho {
opts.NoEcho = true
}
if c.Name != "" {
opts.Name = c.Name
}
if c.Verbose {
opts.Verbose = true
}
if c.Pedantic {
opts.Pedantic = true
}
if c.AllowReconnect {
opts.AllowReconnect = true
}
if c.MaxReconnect > 0 {
opts.MaxReconnect = c.MaxReconnect
}
if c.ReconnectWait > 0 {
opts.ReconnectWait = c.ReconnectWait
}
if c.ReconnectJitter > 0 {
opts.ReconnectJitter = c.ReconnectJitter
}
if c.ReconnectJitterTLS > 0 {
opts.ReconnectJitterTLS = c.ReconnectJitterTLS
}
if c.Timeout > 0 {
opts.Timeout = c.Timeout
}
if c.DrainTimeout > 0 {
opts.DrainTimeout = c.DrainTimeout
}
if c.FlusherTimeout > 0 {
opts.FlusherTimeout = c.FlusherTimeout
}
if c.PingInterval > 0 {
opts.PingInterval = c.PingInterval
}
if c.MaxPingsOut > 0 {
opts.MaxPingsOut = c.MaxPingsOut
}
if c.ReconnectBufSize > 0 {
opts.ReconnectBufSize = c.ReconnectBufSize
}
if c.SubChanLen > 0 {
opts.SubChanLen = c.SubChanLen
}
if c.User != "" {
opts.User = c.User
}
if c.Password != "" {
opts.Password = c.Password
}
if c.Token != "" {
opts.Token = c.Token
}
if c.UseOldRequestStyle {
opts.UseOldRequestStyle = true
}
if c.NoCallbacksAfterClientClose {
opts.NoCallbacksAfterClientClose = true
}
if c.Secure {
if t, e := c.TLSConfig.NewFrom(defTls); e != nil {
return nil, e
} else {
opts.TLSConfig = t.TlsConfig("")
}
opts.Secure = true
}
if n, e := opts.Connect(); e != nil {
return nil, ErrorClientConnect.ErrorParent(e)
} else {
return n, nil
}
}

1109
nats/config.go Normal file

File diff suppressed because it is too large Load Diff

631
nats/configPart.go Normal file
View File

@@ -0,0 +1,631 @@
/***********************************************************************************************************************
*
* MIT License
*
* Copyright (c) 2021 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
**********************************************************************************************************************/
package nats
import (
"net/url"
"os"
"time"
natsrv "github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/jwt/v2"
libtls "github.com/nabbar/golib/certificates"
)
type ConfigCustom struct {
AccountResolver natsrv.AccountResolver `mapstructure:"-" json:"-" yaml:"-" toml:"-"`
AccountResolverTLS bool `mapstructure:"-" json:"-" yaml:"-" toml:"-"`
AccountResolverTLSConfig libtls.Config `mapstructure:"-" json:"-" yaml:"-" toml:"-"`
CustomClientAuthentication natsrv.Authentication `mapstructure:"-" json:"-" yaml:"-" toml:"-"`
CustomRouterAuthentication natsrv.Authentication `mapstructure:"-" json:"-" yaml:"-" toml:"-"`
}
// ConfigNkey is for multiple nkey based users.
type ConfigNkey struct {
//Nkey is a new challenge introduced by NATS v2 (ED25519 keys).
Nkey string `mapstructure:"user" json:"user" yaml:"user" toml:"user"`
//Account define the account associated to this NKey.
Account string `mapstructure:"account" json:"account" yaml:"account" toml:"account"`
// SigningKey define the ED 25519 signingKey.
SigningKey string `mapstructure:"signing_key" json:"signing_key" yaml:"signing_key" toml:"signing_key"`
//AllowedConnectionTypes define a list of allowed connection, in list of : STANDARD, WEBSOCKET, LEAFNODE, MQTT.
AllowedConnectionTypes []string `mapstructure:"connection_types" json:"connection_types" yaml:"connection_types" toml:"connection_types"`
}
// ConfigUser is for multiple accounts/users.
type ConfigUser struct {
//Username is the username used for connection.
Username string `mapstructure:"username" json:"username" yaml:"username" toml:"username"`
//Password define the password used for connection.
Password string `mapstructure:"password" json:"password" yaml:"password" toml:"password"`
//Account define the account associated to this NKey.
Account string `mapstructure:"account" json:"account" yaml:"account" toml:"account"`
//AllowedConnectionTypes define a list of allowed connection, in list of : STANDARD, WEBSOCKET, LEAFNODE, MQTT.
AllowedConnectionTypes []string `mapstructure:"connection_types" json:"connection_types" yaml:"connection_types" toml:"connection_types"`
}
// ConfigPermissionsUser are the allowed subjects on a per publish or subscribe basis.
type ConfigPermissionsUser struct {
//Publish define the scope permission for publisher role.
Publish ConfigPermissionSubject `mapstructure:"publish" json:"publish" yaml:"publish" toml:"publish"`
//Subscribe define the scope permission for subscriber role.
Subscribe ConfigPermissionSubject `mapstructure:"subscribe" json:"subscribe" yaml:"subscribe" toml:"subscribe"`
//Response define the scope permission to allow response for a message.
Response ConfigPermissionResponse `mapstructure:"response" json:"response" yaml:"response" toml:"response"`
}
// ConfigPermissionsRoute are similar to user permissions but describe what a server can import/export from and to another server.
type ConfigPermissionsRoute struct {
//Import define the scope permission to import data from the route.
Import ConfigPermissionSubject `mapstructure:"import" json:"import" yaml:"import" toml:"import"`
//Export define the scope permission to export data to the route.
Export ConfigPermissionSubject `mapstructure:"export" json:"export" yaml:"export" toml:"export"`
}
// ConfigPermissionResponse can be used to allow responses to any reply subject that is received on a valid subscription.
type ConfigPermissionResponse struct {
//MaxMsgs define the maximum message response in the expire duration.
MaxMsgs int `mapstructure:"max_msgs" json:"max_msgs" yaml:"max_msgs" toml:"max_msgs"`
//Expires define the TTL of the limitation for max messages.
Expires time.Duration `mapstructure:"expires" json:"expires" yaml:"expires" toml:"expires"`
}
// ConfigPermissionSubject is an individual allow and deny struct for publish and subscribe authorizations.
type ConfigPermissionSubject struct {
//Allow define the allowed scope for permission.
Allow []string `mapstructure:"allow" json:"allow" yaml:"allow" toml:"allow"`
//Deny define the deny scope for permission.
Deny []string `mapstructure:"deny" json:"deny" yaml:"deny" toml:"deny"`
}
// ConfigAccount are subject namespace definitions. By default no messages are shared between accounts.
// You can share via Exports and Imports of Streams and Services.
type ConfigAccount struct {
//Name define the name of the account.
Name string `mapstructure:"name" json:"name" yaml:"name" toml:"name"`
Permission ConfigPermissionsUser `mapstructure:"permission" json:"permission" yaml:"permission" toml:"permission"`
}
type ConfigAuth struct {
//NKeys Set the nkeys list with account.
NKeys []ConfigNkey `mapstructure:"nkeys" json:"nkeys" yaml:"nkeys" toml:"nkeys"`
//Users Set the users list with account.
Users []ConfigUser `mapstructure:"users" json:"users" yaml:"users" toml:"users"`
//Account Set the account list with permissions.
Accounts []ConfigAccount `mapstructure:"accounts" json:"accounts" yaml:"accounts" toml:"accounts"`
//AuthTimeout define the timeout for authentication process.
AuthTimeout time.Duration `mapstructure:"auth_timeout" json:"auth_timeout" yaml:"auth_timeout" toml:"auth_timeout"`
//NoAuthUser allows you to refer to a configured user/account when no credentials are provided.
NoAuthUser string `mapstructure:"no_auth_user" json:"no_auth_user" yaml:"no_auth_user" toml:"no_auth_user"`
//SystemAccount define the account under which nats-server offer services.
SystemAccount string `mapstructure:"system_account" json:"system_account" yaml:"system_account" toml:"system_account"`
//NoSystemAccount disable the system account.
NoSystemAccount bool `mapstructure:"no_system_account" json:"no_system_account" yaml:"no_system_account" toml:"no_system_account"`
//AllowNewAccounts define whether or not new accounts can be created on the fly.
AllowNewAccounts bool `mapstructure:"allow_new_accounts" json:"allow_new_accounts" yaml:"allow_new_accounts" toml:"allow_new_accounts"`
//TrustedKeys define the list of trusted keys allowed to operate the server
TrustedKeys []string `mapstructure:"trusted_keys" json:"trusted_keys" yaml:"trusted_keys" toml:"trusted_keys"`
//TrustedOperators define a list of jwt file for operator claim.
TrustedOperators []string `mapstructure:"trusted_operators" json:"trusted_operators" yaml:"trusted_operators" toml:"trusted_operators"`
}
type ConfigLogger struct {
//LogFile define the file to store log output.
LogFile string `mapstructure:"log_file" json:"log_file" yaml:"log_file" toml:"log_file"`
// PermissionFolderLogFile is the permission apply if a folder is created
PermissionFolderLogFile os.FileMode `mapstructure:"permission_folder" json:"permission_folder" yaml:"permission_folder" toml:"permission_folder"`
// PermissionFileLogFile is the permission apply if a file is created
PermissionFileLogFile os.FileMode `mapstructure:"permission_file" json:"permission_file" yaml:"permission_file" toml:"permission_file"`
//Syslog define if log output must be sent to syslog.
Syslog bool `mapstructure:"syslog" json:"syslog" yaml:"syslog" toml:"syslog"`
//RemoteSyslog define the syslog server address like '(udp://127.0.0.1:514)'.
RemoteSyslog string `mapstructure:"remote_syslog" json:"remote_syslog" yaml:"remote_syslog" toml:"remote_syslog"`
//LogSizeLimit define the maximum size allowed for the log file.
LogSizeLimit int64 `mapstructure:"log_size_limit" json:"log_size_limit" yaml:"log_size_limit" toml:"log_size_limit"`
//MaxTracedMsgLen define the max size in chars of trace message
MaxTracedMsgLen int `mapstructure:"max_traced_msg_len" json:"max_traced_msg_len" yaml:"max_traced_msg_len" toml:"max_traced_msg_len"`
// ConnectErrorReports specifies the number of failed attempts at which point server should report the failure of an initial connection to a route, gateway or leaf node.
// See DEFAULT_CONNECT_ERROR_REPORTS for default value.
ConnectErrorReports int `mapstructure:"connect_error_reports" json:"connect_error_reports" yaml:"connect_error_reports" toml:"connect_error_reports"`
// ReconnectErrorReports is similar to ConnectErrorReports except that this applies to reconnect events.
ReconnectErrorReports int `mapstructure:"reconnect_error_reports" json:"reconnect_error_reports" yaml:"reconnect_error_reports" toml:"reconnect_error_reports"`
}
type ConfigLimits struct {
//MaxConn Set maximum connection.
MaxConn int `mapstructure:"max_conn" json:"max_conn" yaml:"max_conn" toml:"max_conn"`
//MaxSubs Set the maximum subscriptions.
MaxSubs int `mapstructure:"max_subs" json:"max_subs" yaml:"max_subs" toml:"max_subs"`
//PingInterval define a duration between 2 ping with cluster.
PingInterval time.Duration `mapstructure:"ping_interval" json:"ping_interval" yaml:"ping_interval" toml:"ping_interval"`
//MaxPingsOut define the number of ping error before closing connection.
MaxPingsOut int `mapstructure:"port" json:"max_pings_out" yaml:"port" toml:"port"`
//MaxControlLine define the maximum allowed protocol control line size.
MaxControlLine int `mapstructure:"max_control_line" json:"max_control_line" yaml:"max_control_line" toml:"portmax_control_line"`
//MaxPayload define the maximum allowed payload size.
MaxPayload int `mapstructure:"max_payload" json:"max_payload" yaml:"max_payload" toml:"max_payload"`
//MaxPending define the maximum outbound pending bytes per client.
MaxPending int64 `mapstructure:"max_pending" json:"max_pending" yaml:"max_pending" toml:"max_pending"`
//WriteDeadline define the deadline timeout to flush line of stream message.
WriteDeadline time.Duration `mapstructure:"write_deadline" json:"write_deadline" yaml:"write_deadline" toml:"write_deadline"`
//MaxClosedClients define the number of closed clients connection to keep.
MaxClosedClients int `mapstructure:"max_closed_clients" json:"max_closed_clients" yaml:"max_closed_clients" toml:"max_closed_clients"`
//LameDuckDuration define the timeout to closing all client in LameDuck mode with signal ldm.
LameDuckDuration time.Duration `mapstructure:"lame_duck_duration" json:"lame_duck_duration" yaml:"lame_duck_duration" toml:"lame_duck_duration"`
//LameDuckGracePeriod define the grace period before closing client connection for LameDuck shutdown mode.
LameDuckGracePeriod time.Duration `mapstructure:"lame_duck_grace_period" json:"lame_duck_grace_period" yaml:"lame_duck_grace_period" toml:"lame_duck_grace_period"`
//NoSublistCache define the option to disable subscription caches for all accounts.
//This is saves resources in situations where different subjects are used all the time.
NoSublistCache bool `mapstructure:"no_sublist_cache" json:"no_sublist_cache" yaml:"no_sublist_cache" toml:"no_sublist_cache"`
//NoHeaderSupport define the option to disable header in the server.
//No use except for nats.js, nats.ws, nats.deno and docker image nighty build.
NoHeaderSupport bool `mapstructure:"no_header_support" json:"no_header_support" yaml:"no_header_support" toml:"no_header_support"`
//DisableShortFirstPing define the option to disable the very first PING to a lower interval to capture the initial RTT.
// After that the PING interval will be set to the user defined value.
DisableShortFirstPing bool `mapstructure:"disable_short_first_ping" json:"disable_short_first_ping" yaml:"disable_short_first_ping" toml:"disable_short_first_ping"`
}
type ConfigSrv struct {
//Name define the name of the server.
Name string `mapstructure:"name" json:"name" yaml:"name" toml:"name"`
//Host define the network host to listen on.
Host string `mapstructure:"host" json:"host" yaml:"host" toml:"host"`
//Port define the network port to listen on.
Port int `mapstructure:"port" json:"port" yaml:"port" toml:"port"`
//ClientAdvertise is an alternative client listen specification <host>:<port> or just <host> to
//advertise to clients and other server. Useful in cluster setups with NAT.
ClientAdvertise string `mapstructure:"client_advertise" json:"client_advertise" yaml:"client_advertise" toml:"client_advertise"`
//HTTPHost define host use to expose monitoring api.
HTTPHost string `mapstructure:"http_host" json:"http_host" yaml:"http_host" toml:"http_host"`
//HTTPPort define port use to expose monitoring api.
HTTPPort int `mapstructure:"http_port" json:"http_port" yaml:"http_port" toml:"http_port"`
//HTTPPort define port use to expose monitoring api with tls.
HTTPSPort int `mapstructure:"https_port" json:"https_port" yaml:"https_port" toml:"https_port"`
//HTTPBasePath define the base path for monitoring endpoints.
HTTPBasePath string `mapstructure:"http_base_path" json:"http_base_path" yaml:"http_base_path" toml:"http_base_path"`
//ProfPort define the Profiling HTTP port to enable server for dynamic profiling.
ProfPort int `mapstructure:"prof_port" json:"prof_port" yaml:"prof_port" toml:"prof_port"`
//PidFile define the file path to store PID process.
PidFile string `mapstructure:"pid_file" json:"pid_file" yaml:"pid_file" toml:"pid_file"`
//PortsFileDir define the directory where ports file will be created like '<executable_name>_<pid>.ports'.
PortsFileDir string `mapstructure:"ports_file_dir" json:"ports_file_dir" yaml:"ports_file_dir" toml:"ports_file_dir"`
//Routes define a list of url to actively solicit a connection.
Routes []*url.URL `mapstructure:"routes" json:"routes" yaml:"routes" toml:"routes"`
//RoutesStr define the routes to actively solicit a connection.
RoutesStr string `mapstructure:"routes_str" json:"routes_str" yaml:"routes_str" toml:"routes_str"`
//NoSig is used to disable signal catch for server.
NoSig bool `mapstructure:"no_log" json:"no_log" yaml:"no_log" toml:"no_log"`
//Username is the username used for server connection (like flag --auth in NATS v2 documentation).
Username string `mapstructure:"username" json:"username" yaml:"username" toml:"username"`
//Password define the password used for server connection (like flag --auth in NATS v2 documentation).
Password string `mapstructure:"password" json:"password" yaml:"password" toml:"password"`
//Token define the token used for server connection (like flag --auth in NATS v2 documentation).
Token string `mapstructure:"token" json:"token" yaml:"token" toml:"token"`
//JetStream allow to enable or disable jetStream layer
JetStream bool `mapstructure:"jet_stream" json:"jet_stream" yaml:"jet_stream" toml:"jet_stream"`
//JetStreamMaxMemory define the maximum memory used for jetStream in memory store type
JetStreamMaxMemory int64 `mapstructure:"jet_stream_max_memory" json:"jet_stream_max_memory" yaml:"jet_stream_max_memory" toml:"jet_stream_max_memory"`
//JetStreamMaxStore define the maximum disk used for jetStream in file store type
JetStreamMaxStore int64 `mapstructure:"jet_stream_max_store" json:"jet_stream_max_store" yaml:"jet_stream_max_store" toml:"jet_stream_max_store"`
//StoreDir define the directory path for jetStream in file store type
StoreDir string `mapstructure:"store_dir" json:"store_dir" yaml:"store_dir" toml:"store_dir"`
// PermissionStoreDir is the permission apply if a folder is created
PermissionStoreDir os.FileMode `mapstructure:"permission_store_dir" json:"permission_store_dir" yaml:"permission_store_dir" toml:"permission_store_dir"`
//Tags describing the server.
//They will be included in varz and used as a filter criteria for some system requests
Tags jwt.TagList `mapstructure:"tags" json:"tags" yaml:"tags" toml:"tags"`
//TLS Enable tls for server.
TLS bool `mapstructure:"tls" json:"tls" yaml:"tls" toml:"tls"`
//AllowNoTLS define if client no TLS connection are allowed or no.
AllowNoTLS bool `mapstructure:"allow_no_tls" json:"allow_no_tls" yaml:"allow_no_tls" toml:"allow_no_tls"`
//TLSTimeout define the timeout for tls handshake for client and http monitoring.
TLSTimeout time.Duration `mapstructure:"tls_timeout" json:"tls_timeout" yaml:"tls_timeout" toml:"tls_timeout"`
//TLSConfig Configuration map for tls for client and http monitoring.
TLSConfig libtls.Config `mapstructure:"tls_config" json:"tls_config" yaml:"tls_config" toml:"tls_config"`
}
type ConfigCluster struct {
//Name define the name of the cluster.
Name string `mapstructure:"name" json:"name" yaml:"name" toml:"name"`
//Host define the network host to cluster listen on.
Host string `mapstructure:"host" json:"host" yaml:"host" toml:"host"`
//Port define the network port to cluster listen on.
Port int `mapstructure:"port" json:"port" yaml:"port" toml:"port"`
//ListenStr define the cluster url from which members can solicit routes.
ListenStr string `mapstructure:"listen_str" json:"listen_str" yaml:"listen_str" toml:"listen_str"`
//Advertise define the cluster URL to advertise to other servers.
Advertise string `mapstructure:"advertise" json:"advertise" yaml:"advertise" toml:"advertise"`
//NoAdvertise specify if Advertise known cluster IPs to clients.
NoAdvertise bool `mapstructure:"no_advertise" json:"no_advertise" yaml:"no_advertise" toml:"no_advertise"`
//ConnectRetries define the number of connect retries for implicit routes.
ConnectRetries int `mapstructure:"connect_retries" json:"connect_retries" yaml:"connect_retries" toml:"connect_retries"`
//Username is the username used for cluster connection.
Username string `mapstructure:"username" json:"username" yaml:"username" toml:"username"`
//Password define the password used for cluster connection.
Password string `mapstructure:"password" json:"password" yaml:"password" toml:"password"`
//AuthTimeout define the timeout for authentication process.
AuthTimeout time.Duration `mapstructure:"auth_timeout" json:"auth_timeout" yaml:"auth_timeout" toml:"auth_timeout"`
//Permissions define the scope permission assign to route connections.
Permissions ConfigPermissionsRoute `mapstructure:"permissions" json:"permissions" yaml:"permissions" toml:"permissions"`
//TLS Enable tls for cluster connection.
TLS bool `mapstructure:"tls" json:"tls" yaml:"tls" toml:"tls"`
//TLSTimeout define the timeout for tls handshake for cluster connection.
TLSTimeout time.Duration `mapstructure:"tls_timeout" json:"tls_timeout" yaml:"tls_timeout" toml:"tls_timeout"`
//TLSConfig define the tls configuration for cluster connection.
TLSConfig libtls.Config `mapstructure:"tls_config" json:"tls_config" yaml:"tls_config" toml:"tls_config"`
}
// ConfigGatewayRemote are options for connecting to a remote gateway
// NOTE: This structure is no longer used for monitoring endpoints and json tags are deprecated and may be removed in the future.
type ConfigGatewayRemote struct {
//Name define the name of the current gateways destination.
Name string `mapstructure:"name" json:"name" yaml:"name" toml:"name"`
//URLs define a list of route for the current gateways destination.
URLs []*url.URL `mapstructure:"urls" json:"urls" yaml:"urls" toml:"urls"`
//TLS Enable tls for the current gateways destination.
TLS bool `mapstructure:"tls" json:"tls" yaml:"tls" toml:"tls"`
//TLSTimeout define the timeout for tls handshake for the current gateways destination.
TLSTimeout time.Duration `mapstructure:"tls_timeout" json:"tls_timeout" yaml:"tls_timeout" toml:"tls_timeout"`
//TLSConfig define the tls configuration for the current gateways destination.
TLSConfig libtls.Config `mapstructure:"tls_config" json:"tls_config" yaml:"tls_config" toml:"tls_config"`
}
// ConfigGateway are options for gateways.
// NOTE: This structure is no longer used for monitoring endpoints and json tags are deprecated and may be removed in the future.
type ConfigGateway struct {
//Name define the name of the gateway.
Name string `mapstructure:"name" json:"name" yaml:"name" toml:"name"`
//Host define the network host to listen on.
Host string `mapstructure:"host" json:"host" yaml:"host" toml:"host"`
//Port define the network port to listen on.
Port int `mapstructure:"port" json:"port" yaml:"port" toml:"port"`
//Username is the username used for gateways connection.
Username string `mapstructure:"username" json:"username" yaml:"username" toml:"username"`
//Password define the password used for gateways connection.
Password string `mapstructure:"password" json:"password" yaml:"password" toml:"password"`
//AuthTimeout define the timeout for authentication process.
AuthTimeout time.Duration `mapstructure:"auth_timeout" json:"auth_timeout" yaml:"auth_timeout" toml:"auth_timeout"`
//Advertise define the gateway URL to advertise to other servers.
Advertise string `mapstructure:"advertise" json:"advertise" yaml:"advertise" toml:"advertise"`
//ConnectRetries define the number of connect retries for implicit routes.
ConnectRetries int `mapstructure:"connect_retries" json:"connect_retries" yaml:"connect_retries" toml:"connect_retries"`
//Gateways define a list of route for gateways.
Gateways []*ConfigGatewayRemote `mapstructure:"gateways" json:"gateways" yaml:"gateways" toml:"gateways"`
//RejectUnknown allow to reject unknown cluster connection.
RejectUnknown bool `mapstructure:"reject_unknown" json:"reject_unknown" yaml:"reject_unknown" toml:"reject_unknown"`
//TLS Enable tls for gateways connection.
TLS bool `mapstructure:"tls" json:"tls" yaml:"tls" toml:"tls"`
//TLSTimeout define the timeout for tls handshake for gateways connection.
TLSTimeout time.Duration `mapstructure:"tls_timeout" json:"tls_timeout" yaml:"tls_timeout" toml:"tls_timeout"`
//TLSConfig define the tls configuration for gateways connection.
TLSConfig libtls.Config `mapstructure:"tls_config" json:"tls_config" yaml:"tls_config" toml:"tls_config"`
}
// ConfigLeaf are options for a given server to accept leaf node connections and/or connect to a remote cluster.
type ConfigLeaf struct {
//Host define the network host to listen on.
Host string `mapstructure:"host" json:"host" yaml:"host" toml:"host"`
//Port define the network port to listen on.
Port int `mapstructure:"port" json:"port" yaml:"port" toml:"port"`
//Username is the username used for leaf connection.
Username string `mapstructure:"username" json:"username" yaml:"username" toml:"username"`
//Password define the password used for leaf connection.
Password string `mapstructure:"password" json:"password" yaml:"password" toml:"password"`
//AuthTimeout define the timeout for authentication process.
AuthTimeout time.Duration `mapstructure:"auth_timeout" json:"auth_timeout" yaml:"auth_timeout" toml:"auth_timeout"`
//Advertise define the gateway URL to advertise to other servers.
Advertise string `mapstructure:"advertise" json:"advertise" yaml:"advertise" toml:"advertise"`
//NoAdvertise specify if Advertise known leaf node IPs to clients.
NoAdvertise bool `mapstructure:"no_advertise" json:"no_advertise" yaml:"no_advertise" toml:"no_advertise"`
//Account define the account under which leaf offer services.
Account string `mapstructure:"account" json:"account" yaml:"account" toml:"account"`
//Users Set the users list with account.
Users []ConfigUser `mapstructure:"users" json:"users" yaml:"users" toml:"users"`
//ReconnectInterval define the duration to wait after a closed connection and before trying to reconnect
ReconnectInterval time.Duration `mapstructure:"reconnect_interval" json:"reconnect_interval" yaml:"reconnect_interval" toml:"reconnect_interval"`
//Remotes define For solicited connections to other clusters/superclusters.
Remotes []*ConfigLeafRemote `mapstructure:"remotes" json:"remotes" yaml:"remotes" toml:"remotes"`
//TLS Enable tls for leaf connection.
TLS bool `mapstructure:"tls" json:"tls" yaml:"tls" toml:"tls"`
//TLSTimeout define the timeout for tls handshake for leaf connection.
TLSTimeout time.Duration `mapstructure:"tls_timeout" json:"tls_timeout" yaml:"tls_timeout" toml:"tls_timeout"`
//TLSConfig define the tls configuration for leaf connection.
TLSConfig libtls.Config `mapstructure:"tls_config" json:"tls_config" yaml:"tls_config" toml:"tls_config"`
}
// ConfigLeafRemote are options for connecting to a remote server as a leaf node.
type ConfigLeafRemote struct {
//LocalAccount define the local account to use for this remote cluster.
LocalAccount string `mapstructure:"local_account" json:"local_account" yaml:"local_account" toml:"local_account"`
//URLs define a list of route for the current gateways destination.
URLs []*url.URL `mapstructure:"urls" json:"urls" yaml:"urls" toml:"urls"`
//Credentials define the file path for authentication with credentials file
Credentials string `mapstructure:"credentials" json:"credentials" yaml:"credentials" toml:"credentials"`
//Hub define the remote connection as hub
Hub bool `mapstructure:"hub" json:"hub" yaml:"hub" toml:"hub"`
//DenyImports define a list of subject/queues denied for import
DenyImports []string `mapstructure:"deny_imports" json:"deny_imports" yaml:"deny_imports" toml:"deny_imports"`
//DenyExports define a list of subject/queues denied for export
DenyExports []string `mapstructure:"deny_exports" json:"deny_exports" yaml:"deny_exports" toml:"deny_exports"`
//Websocket define options for websocket connections.
// When an URL has the "ws" (or "wss") scheme, then the server will initiate the
// connection as a websocket connection. By default, the websocket frames will be
// masked (as if this server was a websocket client to the remote server). The
// NoMasking option will change this behavior and will send umasked frames.
Websocket struct {
//Compression define if compression is enable for remote ws connection
Compression bool `mapstructure:"compression" json:"compression" yaml:"compression" toml:"compression"`
//NoMasking define if the remote ws is must be masked.
//By default ws are masked but this option allow to expose it.
NoMasking bool `mapstructure:"no_masking" json:"no_masking" yaml:"no_masking" toml:"no_masking"`
} `mapstructure:"websocket" json:"websocket" yaml:"websocket" toml:"websocket"`
//TLS Enable tls for this remote cluster connection.
TLS bool `mapstructure:"tls" json:"tls" yaml:"tls" toml:"tls"`
//TLSTimeout define the timeout for tls handshake for this remote cluster connection.
TLSTimeout time.Duration `mapstructure:"tls_timeout" json:"tls_timeout" yaml:"tls_timeout" toml:"tls_timeout"`
//TLSConfig define the tls configuration for this remote cluster connection.
TLSConfig libtls.Config `mapstructure:"tls_config" json:"tls_config" yaml:"tls_config" toml:"tls_config"`
}
// ConfigWebsocket are options for websocket
type ConfigWebsocket struct {
//Host The server will accept websocket client connections on this hostname/IP.
Host string `mapstructure:"" json:"host" yaml:"host" toml:"host"`
//Port The server will accept websocket client connections on this port.
Port int `mapstructure:"port" json:"port" yaml:"port" toml:"port"`
//Advertise The host:port to advertise to websocket clients in the cluster.
Advertise string `mapstructure:"advertise" json:"advertise" yaml:"advertise" toml:"advertise"`
//NoAuthUser define the default user for new client connection.
//If no user name is provided when a client connects, will default to the matching user from the global list of users in `Options.Users`.
NoAuthUser string `mapstructure:"no_auth_user" json:"no_auth_user" yaml:"no_auth_user" toml:"no_auth_user"`
//JWTCookie define the name of the cookie, which if present in WebSocket upgrade headers,
//will be treated as JWT during CONNECT phase as long as "jwt" specified in the CONNECT options is missing or empty.
JWTCookie string `mapstructure:"jwt_cookie" json:"jwt_cookie" yaml:"jwt_cookie" toml:"jwt_cookie"`
//Authentication section.
//If anything is configured in this section, it will override the authorization configuration of regular clients.
//Username is the username used.
Username string `mapstructure:"username" json:"username" yaml:"username" toml:"username"`
//Password define the password used.
Password string `mapstructure:"password" json:"password" yaml:"password" toml:"password"`
//Token define the token used.
Token string `mapstructure:"token" json:"token" yaml:"token" toml:"token"`
//AuthTimeout define the timeout for authentication process.
AuthTimeout time.Duration `mapstructure:"auth_timeout" json:"auth_timeout" yaml:"auth_timeout" toml:"auth_timeout"`
//SameOrigin define if request's host must match the Origin header or not.
SameOrigin bool `mapstructure:"same_origin" json:"same_origin" yaml:"same_origin" toml:"same_origin"`
//AllowedOrigins define a list of allowed origin could not matching with request's host.
// Only origins in this list will be accepted. If empty and SameOrigin is false, any origin is accepted.
AllowedOrigins []string `mapstructure:"allowed_origins" json:"allowed_origins" yaml:"allowed_origins" toml:"allowed_origins"`
//Compression allow to activate compression between client and server.
//If set to true, the server will negotiate with clients if compression can be used.
//If this is false, no compression will be used (both in server and clients) since it has to be negotiated between both endpoints
Compression bool `mapstructure:"compression" json:"compression" yaml:"compression" toml:"compression"`
// NoTLS allow to start websocket server without tls.
NoTLS bool `mapstructure:"no_tls" json:"no_tls" yaml:"no_tls" toml:"no_tls"`
//HandshakeTimeout is the total time allowed for the server to read the client request and write the response back to the client.
//This include the time needed for the TLS Handshake.
HandshakeTimeout time.Duration `mapstructure:"handshake_timeout" json:"handshake_timeout" yaml:"handshake_timeout" toml:"handshake_timeout"`
//TLSConfig define the tls configuration for this remote cluster connection.
TLSConfig libtls.Config `mapstructure:"tls_config" json:"tls_config" yaml:"tls_config" toml:"tls_config"`
}
// ConfigMQTT are options for MQTT
type ConfigMQTT struct {
//Host define the hostname/IP to accept MQTT client connections.
Host string `mapstructure:"" json:"host" yaml:"host" toml:"host"`
//Port define the port to accept MQTT client connections.
Port int `mapstructure:"port" json:"port" yaml:"port" toml:"port"`
//NoAuthUser define the default user for new client connection.
//If no user name is provided when a client connects, will default to the matching user from the global list of users in `Options.Users`.
NoAuthUser string `mapstructure:"no_auth_user" json:"no_auth_user" yaml:"no_auth_user" toml:"no_auth_user"`
//Authentication section.
//If anything is configured in this section, it will override the authorization configuration of regular clients.
//Username is the username used.
Username string `mapstructure:"username" json:"username" yaml:"username" toml:"username"`
//Password define the password used.
Password string `mapstructure:"password" json:"password" yaml:"password" toml:"password"`
//Token define the token used.
Token string `mapstructure:"token" json:"token" yaml:"token" toml:"token"`
//AuthTimeout define the timeout for authentication process.
AuthTimeout time.Duration `mapstructure:"auth_timeout" json:"auth_timeout" yaml:"auth_timeout" toml:"auth_timeout"`
// AckWait is the amount of time after which a QoS 1 message sent to
// a client is redelivered as a DUPLICATE if the server has not
// received the PUBACK on the original Packet Identifier.
// The value has to be positive.
// Zero will cause the server to use the default value (30 seconds).
// Note that changes to this option is applied only to new MQTT subscriptions.
AckWait time.Duration `mapstructure:"ack_wait" json:"ack_wait" yaml:"ack_wait" toml:"ack_wait"`
// MaxAckPending is the amount of QoS 1 messages the server can send to
// a subscription without receiving any PUBACK for those messages.
// The valid range is [0..65535].
// The total of subscriptions' MaxAckPending on a given session cannot
// exceed 65535. Attempting to create a subscription that would bring
// the total above the limit would result in the server returning 0x80
// in the SUBACK for this subscription.
// Due to how the NATS Server handles the MQTT "#" wildcard, each
// subscription ending with "#" will use 2 times the MaxAckPending value.
// Note that changes to this option is applied only to new subscriptions.
MaxAckPending uint16 `mapstructure:"max_ack_pending" json:"max_ack_pending" yaml:"max_ack_pending" toml:"max_ack_pending"`
//TLS Enable tls for this remote cluster connection.
TLS bool `mapstructure:"tls" json:"tls" yaml:"tls" toml:"tls"`
//TLSTimeout is the time needed for the TLS Handshake.
TLSTimeout time.Duration `mapstructure:"tls_timeout" json:"tls_timeout" yaml:"tls_timeout" toml:"tls_timeout"`
//TLSConfig define the tls configuration for MQTT client connection.
TLSConfig libtls.Config `mapstructure:"tls_config" json:"tls_config" yaml:"tls_config" toml:"tls_config"`
}

84
nats/errors.go Normal file
View File

@@ -0,0 +1,84 @@
/***********************************************************************************************************************
*
* MIT License
*
* Copyright (c) 2021 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
**********************************************************************************************************************/
package nats
import liberr "github.com/nabbar/golib/errors"
const (
ErrorParamsEmpty liberr.CodeError = iota + liberr.MinPkgNutsDB
ErrorParamsMissing
ErrorParamsMismatching
ErrorParamsInvalid
ErrorParamsInvalidNumber
ErrorConfigValidation
ErrorConfigInvalidJWTOperator
ErrorConfigInvalidAccount
ErrorConfigInvalidAllowedConnectionType
ErrorConfigInvalidFilePath
ErrorConfigJsonMarshall
ErrorConfigWriteInFile
ErrorClientConnect
ErrorClientClusterConnect
ErrorServerStart
)
var isCodeError = false
func IsCodeError() bool {
return isCodeError
}
func init() {
isCodeError = liberr.ExistInMapMessage(ErrorParamsEmpty)
liberr.RegisterIdFctMessage(ErrorParamsEmpty, getMessage)
}
func getMessage(code liberr.CodeError) (message string) {
switch code {
case liberr.UNK_ERROR:
return ""
case ErrorParamsEmpty:
return "at least one given parameter is empty"
case ErrorParamsMissing:
return "at least one given parameter is missing"
case ErrorParamsMismatching:
return "at least one given parameter does not match the awaiting type"
case ErrorParamsInvalid:
return "at least one given parameter is invalid"
case ErrorParamsInvalidNumber:
return "the number of parameters is not matching the awaiting number"
case ErrorConfigValidation:
return "config seems to be invalid"
case ErrorClientConnect:
return "cannot start new client connection to server"
case ErrorClientClusterConnect:
return "cannot start new client connection to cluster"
}
return ""
}

253
nats/server.go Normal file
View File

@@ -0,0 +1,253 @@
/***********************************************************************************************************************
*
* MIT License
*
* Copyright (c) 2021 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
**********************************************************************************************************************/
package nats
import (
"context"
"fmt"
"strings"
"sync/atomic"
"time"
libtls "github.com/nabbar/golib/certificates"
liberr "github.com/nabbar/golib/errors"
natsrv "github.com/nats-io/nats-server/v2/server"
natcli "github.com/nats-io/nats.go"
)
const (
DefaultWaitReady = 50 * time.Millisecond
DefaultTickReady = 5 * DefaultWaitReady
)
type Server interface {
Listen(ctx context.Context) liberr.Error
Restart(ctx context.Context) liberr.Error
Shutdown()
GetOptions() *natsrv.Options
SetOptions(opt *natsrv.Options)
IsRunning() bool
IsReady() bool
WaitReady(ctx context.Context, tick time.Duration)
ClientAdvertise(ctx context.Context, tick time.Duration, defTls libtls.TLSConfig, opt Client) (cli *natcli.Conn, err liberr.Error)
ClientCluster(ctx context.Context, tick time.Duration, defTls libtls.TLSConfig, opt Client) (cli *natcli.Conn, err liberr.Error)
ClientServer(ctx context.Context, tick time.Duration, defTls libtls.TLSConfig, opt Client) (cli *natcli.Conn, err liberr.Error)
//StatusInfo() (name string, release string, hash string)
//StatusHealth() error
//StatusRoute(prefix string, fctMessage status.FctMessage, sts status.RouteStatus)
}
func NewServer(opt *natsrv.Options) Server {
o := new(atomic.Value)
if opt != nil {
o.Store(opt)
}
return &server{
o: o,
s: nil,
r: new(atomic.Value),
}
}
type server struct {
o *atomic.Value
s *natsrv.Server
r *atomic.Value
}
func (s *server) Listen(ctx context.Context) liberr.Error {
if s.IsRunning() || s.IsReady() {
s.Shutdown()
}
var e error
if s.s, e = natsrv.NewServer(s.GetOptions()); e != nil {
return ErrorServerStart.ErrorParent(e)
}
s.s.ConfigureLogger()
s.s.Start()
s.setRunning(true)
s.WaitReady(ctx, 0)
return nil
}
func (s *server) Restart(ctx context.Context) liberr.Error {
return s.Listen(ctx)
}
func (s *server) Shutdown() {
if s.s != nil {
s.s.Shutdown()
}
s.setRunning(false)
}
func (s *server) GetOptions() *natsrv.Options {
if s.o == nil {
s.o = new(atomic.Value)
}
if i := s.o.Load(); i == nil {
return nil
} else if o, ok := i.(*natsrv.Options); !ok {
return nil
} else {
return o
}
}
func (s *server) SetOptions(opt *natsrv.Options) {
if opt == nil {
s.o = new(atomic.Value)
return
} else if s.o == nil {
s.o = new(atomic.Value)
}
s.o.Store(opt)
}
func (s *server) IsRunning() bool {
if s.r == nil {
s.r = new(atomic.Value)
}
if i := s.r.Load(); i == nil {
return false
} else if r, ok := i.(bool); !ok {
return false
} else {
return r
}
}
func (s *server) setRunning(run bool) {
if s.r == nil {
s.r = new(atomic.Value)
}
s.r.Store(run)
}
func (s *server) IsReady() bool {
if s.s != nil {
return s.s.ReadyForConnections(DefaultWaitReady)
}
return false
}
func (s *server) WaitReady(ctx context.Context, tick time.Duration) {
if tick == 0 {
tick = DefaultTickReady
}
for {
if s.IsReady() {
return
}
time.Sleep(tick)
if ctx.Err() != nil {
return
}
}
}
func (s *server) ClientAdvertise(ctx context.Context, tick time.Duration, defTls libtls.TLSConfig, opt Client) (cli *natcli.Conn, err liberr.Error) {
if o := s.GetOptions(); o != nil && o.ClientAdvertise != "" {
opt.Url = s.formatAddress(o.ClientAdvertise)
} else {
return nil, ErrorConfigValidation.Error(nil)
}
return opt.NewClient(defTls)
}
func (s *server) ClientCluster(ctx context.Context, tick time.Duration, defTls libtls.TLSConfig, opt Client) (cli *natcli.Conn, err liberr.Error) {
s.WaitReady(ctx, tick)
if cAddr := s.s.ClusterAddr(); cAddr != nil && cAddr.String() != "" {
opt.Url = s.formatAddress(cAddr.String())
} else {
return nil, ErrorConfigValidation.Error(nil)
}
return opt.NewClient(defTls)
}
func (s *server) ClientServer(ctx context.Context, tick time.Duration, defTls libtls.TLSConfig, opt Client) (cli *natcli.Conn, err liberr.Error) {
var o *natsrv.Options
if o = s.GetOptions(); o == nil {
return nil, ErrorConfigValidation.Error(nil)
}
s.WaitReady(ctx, tick)
if sAddr := s.s.Addr(); sAddr != nil && sAddr.String() != "" {
opt.Url = s.formatAddress(sAddr.String())
} else if o.Host != "" && o.Port > 0 {
opt.Url = s.formatAddress(fmt.Sprintf("%s:%d", o.Host, o.Port))
} else {
return nil, ErrorConfigValidation.Error(nil)
}
return opt.NewClient(defTls)
}
func (s *server) formatAddress(addr string) string {
if addr == "" {
return ""
}
if strings.Contains(addr, ",") {
var b = make([]string, 0)
for _, a := range strings.Split(addr, ",") {
b = append(b, s.formatAddress(a))
}
return strings.Join(b, ",")
}
if strings.HasPrefix(addr, "nats://") {
return addr
} else {
return fmt.Sprintf("nats://%s", addr)
}
}

420
test/test-nats/main.go Normal file
View File

@@ -0,0 +1,420 @@
/***********************************************************************************************************************
*
* MIT License
*
* Copyright (c) 2021 Nicolas JUHEL
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
**********************************************************************************************************************/
package main
import (
"context"
crand "crypto/rand"
"encoding/binary"
"fmt"
"log"
"math/rand"
"net/url"
"os"
"path/filepath"
"runtime"
"strings"
"time"
"github.com/fxamacker/cbor/v2"
libtls "github.com/nabbar/golib/certificates"
liberr "github.com/nabbar/golib/errors"
liblog "github.com/nabbar/golib/logger"
libnat "github.com/nabbar/golib/nats"
libpwd "github.com/nabbar/golib/password"
libsem "github.com/nabbar/golib/semaphore"
"github.com/nats-io/jwt/v2"
natcli "github.com/nats-io/nats.go"
)
const (
BasePortServer = 9000
BasePortCluster = 9100
BasePortHttp = 9200
BasePortProf = 9300
BasePathFolder = "/nats"
SubLogFile = "nats-node-%d.log"
SubNodeDir = "node-%d"
NbNodeInstance = 3
NbEntries = 1000000
nameProducer = "produser"
nameSubsriber = "subuser"
cptCluster = "cluster"
usrCluster = "cluster"
pwdCluster = "cLu!123-test"
Subject = "_INBOX"
)
var (
cluster []libnat.Server
ctx context.Context
cnl context.CancelFunc
rng *rand.Rand
)
type Message struct {
Id int
Random []string
}
func init() {
liblog.SetLevel(liblog.InfoLevel)
liblog.EnableColor()
liblog.AddGID(true)
liblog.FileTrace(false)
liberr.SetModeReturnError(liberr.ErrorReturnCodeErrorTrace)
cluster = make([]libnat.Server, NbNodeInstance)
rng = rand.New(&cryptoSource{})
}
func main() {
ctx, cnl = context.WithCancel(context.Background())
defer cnl()
for i := 0; i < NbNodeInstance; i++ {
cfg := configServer(i)
if opt, err := cfg.NatsOption(nil); err != nil {
panic(err.GetErrorFull(""))
} else if err = cfg.LogConfigJson(); err != nil {
panic(err.CodeErrorTraceFull("", ""))
} else {
cluster[i] = libnat.NewServer(opt)
}
}
defer func() {
for i := 0; i < NbNodeInstance; i++ {
if cluster[i] != nil {
cluster[i].Shutdown()
}
}
}()
for i := 0; i < NbNodeInstance; i++ {
go func(ctx context.Context, clu libnat.Server) {
if e := clu.Listen(ctx); e != nil {
panic(e.CodeErrorTraceFull("", ""))
}
}(ctx, cluster[i])
}
cluster[0].WaitReady(ctx, 200*time.Millisecond)
sem := libsem.NewSemaphoreWithContext(ctx, 0)
defer sem.DeferMain()
optSub := configClient(nameSubsriber, usrCluster, pwdCluster)
sub, err := optSub.NewClient(nil)
if err != nil {
panic(err.CodeErrorTraceFull("", ""))
}
defer func() {
if sub != nil && !sub.IsClosed() {
sub.Close()
}
}()
s, e := sub.Subscribe(Subject, func(msg *natcli.Msg) {
if !msg.Sub.IsValid() {
return
}
go func(m *natcli.Msg) {
id, random := Unserialize(m.Data)
r := time.Duration(rng.Intn(1000)) * time.Millisecond
time.Sleep(r)
_, _ = fmt.Fprintf(os.Stdout, "Subscriber read id '%06d' (after %s wait) : %s\n", id, r.String(), strings.Join(random, "|"))
}(msg)
})
if e != nil {
panic(e)
}
defer func() {
if s == nil {
return
}
if !s.IsValid() {
return
}
if e = s.Unsubscribe(); e != nil {
panic(e)
}
}()
if !s.IsValid() {
panic(s)
}
time.Sleep(3 * time.Second)
optPrd := configClient(nameProducer, usrCluster, pwdCluster)
prd, err := optPrd.NewClient(nil)
if err != nil {
panic(err)
}
defer func() {
if prd != nil && !prd.IsClosed() {
prd.Close()
}
}()
for i := 0; i < NbEntries; i++ {
msg := []string{
libpwd.Generate(64),
libpwd.Generate(64),
libpwd.Generate(64),
libpwd.Generate(64),
}
if err = sem.NewWorker(); err != nil {
panic(err)
}
go func(idx int, msg []string, sem libsem.Sem) {
defer sem.DeferWorker()
time.Sleep(500 * time.Microsecond)
if e := prd.Publish(Subject, Serialize(idx, msg)); e != nil {
panic(e)
}
_, _ = fmt.Fprintf(os.Stdout, "Publisher write id '%06d' : %s\n", idx, strings.Join(msg, "|"))
}(i, msg, sem)
}
if e := sem.WaitAll(); e != nil {
panic(e)
}
time.Sleep(10 * time.Second)
if e := s.Unsubscribe(); e != nil {
panic(e)
}
prd.Close()
time.Sleep(200 * time.Millisecond)
sub.Close()
println(strings.Join(GetMemUsage(), "\n"))
}
func Serialize(idx int, random []string) []byte {
msg := Message{
Id: idx,
Random: random,
}
if r, e := cbor.Marshal(msg); e != nil {
panic(e)
} else {
return r
}
}
func Unserialize(p []byte) (idx int, random []string) {
msg := Message{}
if e := cbor.Unmarshal(p, &msg); e != nil {
panic(e)
} else {
return msg.Id, msg.Random
}
}
func configServer(id int) libnat.Config {
rts := make([]*url.URL, 0)
for j := 0; j < NbNodeInstance; j++ {
if j == id {
continue
}
rts = append(rts, &url.URL{
Scheme: "nats",
Opaque: "",
User: url.UserPassword(usrCluster, pwdCluster),
Host: fmt.Sprintf("127.0.0.1:%d", BasePortCluster+j),
Path: "",
RawPath: "",
ForceQuery: false,
RawQuery: "",
Fragment: "",
RawFragment: "",
})
}
return libnat.Config{
Server: libnat.ConfigSrv{
Name: fmt.Sprintf("node-%d", id),
Host: "127.0.0.1",
Port: BasePortServer + id,
HTTPHost: "127.0.0.1",
HTTPPort: BasePortHttp + id,
ProfPort: BasePortProf + id,
Routes: rts,
NoSig: true,
JetStream: true,
StoreDir: fmt.Sprintf(filepath.Join(BasePathFolder, SubNodeDir), id),
PermissionStoreDir: 0755,
TLS: false,
AllowNoTLS: true,
TLSConfig: libtls.Config{
InheritDefault: true,
},
},
Cluster: libnat.ConfigCluster{
Name: "Test-cluster",
Host: "127.0.0.1",
Port: BasePortCluster + id,
ConnectRetries: 5,
TLS: false,
TLSConfig: libtls.Config{
InheritDefault: true,
},
},
Gateways: libnat.ConfigGateway{},
Leaf: libnat.ConfigLeaf{},
Websockets: libnat.ConfigWebsocket{},
MQTT: libnat.ConfigMQTT{},
Limits: libnat.ConfigLimits{},
Logs: libnat.ConfigLogger{
LogFile: fmt.Sprintf(filepath.Join(BasePathFolder, SubLogFile), id),
PermissionFolderLogFile: 0755,
PermissionFileLogFile: 0644,
Syslog: false,
},
Auth: libnat.ConfigAuth{
NKeys: nil,
Users: []libnat.ConfigUser{
{
Username: usrCluster,
Password: pwdCluster,
Account: cptCluster,
AllowedConnectionTypes: []string{
jwt.ConnectionTypeStandard,
jwt.ConnectionTypeLeafnode,
jwt.ConnectionTypeWebsocket,
jwt.ConnectionTypeMqtt,
},
},
},
Accounts: []libnat.ConfigAccount{
{
Name: cptCluster,
Permission: libnat.ConfigPermissionsUser{
Publish: libnat.ConfigPermissionSubject{
Allow: []string{
">",
"*",
},
Deny: make([]string, 0),
},
Subscribe: libnat.ConfigPermissionSubject{
Allow: []string{
">",
"*",
},
Deny: make([]string, 0),
},
Response: libnat.ConfigPermissionResponse{
MaxMsgs: NbEntries,
Expires: time.Second,
},
},
},
},
SystemAccount: cptCluster,
NoSystemAccount: false,
AllowNewAccounts: true,
TrustedKeys: make([]string, 0),
TrustedOperators: make([]string, 0),
},
Customs: nil,
}
}
func configClient(name, user, pass string) libnat.Client {
var srv = make([]string, 0)
for i := 0; i < NbNodeInstance; i++ {
srv = append(srv, fmt.Sprintf("nats://127.0.0.1:%d", BasePortServer+i))
}
return libnat.Client{
Name: name,
Servers: srv,
Pedantic: false,
AllowReconnect: true,
User: user,
Password: pass,
TLSConfig: libtls.Config{
InheritDefault: true,
},
}
}
func GetMemUsage() []string {
var m runtime.MemStats
runtime.ReadMemStats(&m)
return []string{
fmt.Sprintf("\t - Alloc = %v MiB", m.Alloc/1024/1024),
fmt.Sprintf("\t - TotalAlloc = %v MiB", m.TotalAlloc/1024/1024),
fmt.Sprintf("\t - Sys = %v MiB", m.Sys/1024/1024),
fmt.Sprintf("\t - NumGC = %v\n", m.NumGC),
}
}
// random functions
type cryptoSource struct{}
func (s cryptoSource) Seed(seed int64) {}
func (s cryptoSource) Int63() int64 {
return int64(s.Uint64() & ^uint64(1<<63))
}
func (s cryptoSource) Uint64() (v uint64) {
err := binary.Read(crand.Reader, binary.BigEndian, &v)
if err != nil {
log.Fatal(err)
}
return v
}