Add Server Options

Adds a new struct of server options which can be used to override default properties. A new options-accepting NewServer function has been created to supersede the New method, which is now deprecated.
This commit is contained in:
mochi
2022-03-31 16:48:29 +01:00
parent d946a9ae16
commit a0060429d1

View File

@@ -5,11 +5,13 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"log"
"net" "net"
"strconv" "strconv"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/logrusorgru/aurora"
"github.com/mochi-co/mqtt/server/events" "github.com/mochi-co/mqtt/server/events"
"github.com/mochi-co/mqtt/server/internal/circ" "github.com/mochi-co/mqtt/server/internal/circ"
"github.com/mochi-co/mqtt/server/internal/clients" "github.com/mochi-co/mqtt/server/internal/clients"
@@ -64,6 +66,7 @@ type Server struct {
inline inlineMessages // channels for direct publishing. inline inlineMessages // channels for direct publishing.
Events events.Events // overrideable event hooks. Events events.Events // overrideable event hooks.
Store persistence.Store // a persistent storage backend if desired. Store persistence.Store // a persistent storage backend if desired.
Options *Options // configurable server options.
Listeners *listeners.Listeners // listeners are network interfaces which listen for new connections. Listeners *listeners.Listeners // listeners are network interfaces which listen for new connections.
Clients *clients.Clients // clients which are known to the broker. Clients *clients.Clients // clients which are known to the broker.
Topics *topics.Index // an index of topic filter subscriptions and retained messages. Topics *topics.Index // an index of topic filter subscriptions and retained messages.
@@ -73,17 +76,38 @@ type Server struct {
done chan bool // indicate that the server is ending. done chan bool // indicate that the server is ending.
} }
// Options contains configurable options for the server.
type Options struct {
// BufferSize overrides the default buffer size (circ.DefaultBufferSize) for the client buffers.
BufferSize int
// BufferBlockSize overrides the default buffer block size (DefaultBlockSize) for the client buffers.
BufferBlockSize int
}
// inlineMessages contains channels for handling inline (direct) publishing. // inlineMessages contains channels for handling inline (direct) publishing.
type inlineMessages struct { type inlineMessages struct {
done chan bool // indicate that the server is ending. done chan bool // indicate that the server is ending.
pub chan packets.Packet // a channel of packets to publish to clients pub chan packets.Packet // a channel of packets to publish to clients
} }
// New returns a new instance of an MQTT broker. // New returns a new instance of MQTT server with no options.
// This method has been deprecated and will be removed in a future release.
// Please use NewServer instead.
func New() *Server { func New() *Server {
log.Println(aurora.Red("mqtt.New() has been deprecated and will be removed in a future release - please use mqtt.NewServer(opts *Options) instead!"))
return NewServer(nil)
}
// NewServer returns a new instance of an MQTT broker with optional values where applicable.
func NewServer(opts *Options) *Server {
if opts == nil {
opts = new(Options)
}
s := &Server{ s := &Server{
done: make(chan bool), done: make(chan bool),
bytepool: circ.NewBytesPool(circ.DefaultBufferSize), bytepool: circ.NewBytesPool(opts.BufferSize),
Clients: clients.New(), Clients: clients.New(),
Topics: topics.New(), Topics: topics.New(),
System: &system.Info{ System: &system.Info{
@@ -95,7 +119,8 @@ func New() *Server {
done: make(chan bool), done: make(chan bool),
pub: make(chan packets.Packet, 1024), pub: make(chan packets.Packet, 1024),
}, },
Events: events.Events{}, Events: events.Events{},
Options: opts,
} }
// Expose server stats using the system listener so it can be used in the // Expose server stats using the system listener so it can be used in the
@@ -238,8 +263,8 @@ func (s *Server) EstablishConnection(lid string, c net.Conn, ac auth.Controller)
xbw := s.bytepool.Get() // and for sending. xbw := s.bytepool.Get() // and for sending.
cl := clients.NewClient(c, cl := clients.NewClient(c,
circ.NewReaderFromSlice(0, xbr), circ.NewReaderFromSlice(s.Options.BufferBlockSize, xbr),
circ.NewWriterFromSlice(0, xbw), circ.NewWriterFromSlice(s.Options.BufferBlockSize, xbw),
s.System, s.System,
) )