diff --git a/server/server.go b/server/server.go index f1760bc..f467429 100644 --- a/server/server.go +++ b/server/server.go @@ -5,11 +5,13 @@ import ( "errors" "fmt" "io" + "log" "net" "strconv" "sync/atomic" "time" + "github.com/logrusorgru/aurora" "github.com/mochi-co/mqtt/server/events" "github.com/mochi-co/mqtt/server/internal/circ" "github.com/mochi-co/mqtt/server/internal/clients" @@ -64,6 +66,7 @@ type Server struct { inline inlineMessages // channels for direct publishing. Events events.Events // overrideable event hooks. Store persistence.Store // a persistent storage backend if desired. + Options *Options // configurable server options. Listeners *listeners.Listeners // listeners are network interfaces which listen for new connections. Clients *clients.Clients // clients which are known to the broker. Topics *topics.Index // an index of topic filter subscriptions and retained messages. @@ -73,17 +76,38 @@ type Server struct { done chan bool // indicate that the server is ending. } +// Options contains configurable options for the server. +type Options struct { + // BufferSize overrides the default buffer size (circ.DefaultBufferSize) for the client buffers. + BufferSize int + + // BufferBlockSize overrides the default buffer block size (DefaultBlockSize) for the client buffers. + BufferBlockSize int +} + // inlineMessages contains channels for handling inline (direct) publishing. type inlineMessages struct { done chan bool // indicate that the server is ending. pub chan packets.Packet // a channel of packets to publish to clients } -// New returns a new instance of an MQTT broker. +// New returns a new instance of MQTT server with no options. +// This method has been deprecated and will be removed in a future release. +// Please use NewServer instead. func New() *Server { + log.Println(aurora.Red("mqtt.New() has been deprecated and will be removed in a future release - please use mqtt.NewServer(opts *Options) instead!")) + return NewServer(nil) +} + +// NewServer returns a new instance of an MQTT broker with optional values where applicable. +func NewServer(opts *Options) *Server { + if opts == nil { + opts = new(Options) + } + s := &Server{ done: make(chan bool), - bytepool: circ.NewBytesPool(circ.DefaultBufferSize), + bytepool: circ.NewBytesPool(opts.BufferSize), Clients: clients.New(), Topics: topics.New(), System: &system.Info{ @@ -95,7 +119,8 @@ func New() *Server { done: make(chan bool), pub: make(chan packets.Packet, 1024), }, - Events: events.Events{}, + Events: events.Events{}, + Options: opts, } // Expose server stats using the system listener so it can be used in the @@ -238,8 +263,8 @@ func (s *Server) EstablishConnection(lid string, c net.Conn, ac auth.Controller) xbw := s.bytepool.Get() // and for sending. cl := clients.NewClient(c, - circ.NewReaderFromSlice(0, xbr), - circ.NewWriterFromSlice(0, xbw), + circ.NewReaderFromSlice(s.Options.BufferBlockSize, xbr), + circ.NewWriterFromSlice(s.Options.BufferBlockSize, xbw), s.System, )