Progress on persistence

This commit is contained in:
Mochi
2020-01-15 21:32:40 +00:00
parent 5073246818
commit 79d1ee392b
11 changed files with 617 additions and 14 deletions

View File

@@ -12,7 +12,6 @@ import (
mqtt "github.com/mochi-co/mqtt/server"
"github.com/mochi-co/mqtt/server/listeners"
"github.com/mochi-co/mqtt/server/listeners/auth"
// _ "net/http/pprof"
// "runtime/trace"
)

View File

@@ -0,0 +1,66 @@
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"github.com/logrusorgru/aurora"
mqtt "github.com/mochi-co/mqtt/server"
"github.com/mochi-co/mqtt/server/listeners"
)
func main() {
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigs
done <- true
}()
fmt.Println(aurora.Magenta("Mochi MQTT Server initializing..."), aurora.Cyan("Persistence"))
server := mqtt.New()
tcp := listeners.NewTCP("t1", ":1883")
err := server.AddListener(tcp, &listeners.Config{
Auth: new(Auth),
})
if err != nil {
panic(err)
}
// Start broker...
go server.Serve()
fmt.Println(aurora.BgMagenta(" Started! "))
// Wait for signals...
<-done
fmt.Println(aurora.BgRed(" Caught Signal "))
// End gracefully.
server.Close()
fmt.Println(aurora.BgGreen(" Finished "))
}
// Auth is an example auth provider for the server.
type Auth struct{}
// Auth returns true if a username and password are acceptable.
// Auth always returns true.
func (a *Auth) Authenticate(user, password []byte) bool {
return true
}
// ACL returns true if a user has access permissions to read or write on a topic.
// ACL is used to deny access to a specific topic to satisfy Test.test_subscribe_failure.
func (a *Auth) ACL(user []byte, topic string, write bool) bool {
if topic == "test/nosubscribe" {
return false
}
return true
}

View File

@@ -54,7 +54,7 @@ func main() {
done <- true
}()
fmt.Println(aurora.Magenta("Mochi MQTT Broker initializing..."))
fmt.Println(aurora.Magenta("Mochi MQTT Server initializing..."), aurora.Cyan("TLS/SSL"))
server := mqtt.New()
tcp := listeners.NewTCP("t1", ":1883")

8
go.mod
View File

@@ -3,15 +3,15 @@ module github.com/mochi-co/mqtt
go 1.13
require (
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee // indirect
github.com/gobwas/pool v0.2.0 // indirect
github.com/gobwas/ws v1.0.2
github.com/asdine/storm v2.1.2+incompatible
github.com/asdine/storm/v3 v3.1.0
github.com/gorilla/websocket v1.4.1
github.com/jinzhu/copier v0.0.0-20190924061706-b57f9002281a
github.com/logrusorgru/aurora v0.0.0-20191116043053-66b7ad493a23
github.com/rs/xid v1.2.1
github.com/stretchr/testify v1.4.0
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553
go.etcd.io/bbolt v1.3.3
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553 // indirect
)
replace github.com/mochi-co/debug => /Users/mochimochi/Development/Go/src/github.com/mochi-co/debug

41
go.sum
View File

@@ -1,15 +1,29 @@
github.com/DataDog/zstd v1.4.1 h1:3oxKN3wbHibqx897utPC2LTQU4J+IHWWJO+glkAkpFM=
github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/Sereal/Sereal v0.0.0-20190618215532-0b8ac451a863 h1:BRrxwOZBolJN4gIwvZMJY1tzqBvQgpaZiQRuIDD40jM=
github.com/Sereal/Sereal v0.0.0-20190618215532-0b8ac451a863/go.mod h1:D0JMgToj/WdxCgd30Kc1UcA9E+WdZoJqeVOuYW7iTBM=
github.com/asdine/storm v2.1.2+incompatible h1:dczuIkyqwY2LrtXPz8ixMrU/OFgZp71kbKTHGrXYt/Q=
github.com/asdine/storm v2.1.2+incompatible/go.mod h1:RarYDc9hq1UPLImuiXK3BIWPJLdIygvV3PsInK0FbVQ=
github.com/asdine/storm/v3 v3.1.0 h1:yrpSNS+E7ef5Y5KjyZDeyW72Dl17lYG7oZ7eUoWvo5s=
github.com/asdine/storm/v3 v3.1.0/go.mod h1:letAoLCXz4UfodwNgMNILMb2oRH+su337ZfHnkRzqDA=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo=
github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8=
github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo=
github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/jinzhu/copier v0.0.0-20190924061706-b57f9002281a h1:zPPuIq2jAWWPTrGt70eK/BSch+gFAGrNzecsoENgu2o=
github.com/jinzhu/copier v0.0.0-20190924061706-b57f9002281a/go.mod h1:yL958EeXv8Ylng6IfnvG4oflryUi3vgA3xPs9hmII1s=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/logrusorgru/aurora v0.0.0-20191116043053-66b7ad493a23 h1:Wp7NjqGKGN9te9N/rvXYRhlVcrulGdxnz8zadXWs7fc=
github.com/logrusorgru/aurora v0.0.0-20191116043053-66b7ad493a23/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -17,14 +31,29 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc=
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI=
github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk=
go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.0.0-20191105084925-a882066a44e0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553 h1:efeOvDhwQ29Dj3SdAV/MJf8oukgn+8D8WgaCaRMchF8=
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20191105142833-ac3223d80179 h1:IqVhUQp5B9ARnZUcfqXy6zP+A+YuPpP7IFo8gFeCOzU=
golang.org/x/sys v0.0.0-20191105142833-ac3223d80179/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
google.golang.org/appengine v1.6.5 h1:tycE03LOZYQNhDpS27tcQdAzLCVMaj7QT2SXxebnpCM=
google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

View File

@@ -0,0 +1,140 @@
package bolt
import (
//"encoding/gob"
"time"
sgob "github.com/asdine/storm/codec/gob"
"github.com/asdine/storm/v3"
"go.etcd.io/bbolt"
"github.com/mochi-co/mqtt/server/persistence"
)
const (
defaultPath = "mochi.db"
defaultTimeout = 250 * time.Millisecond
)
// Store is a backend for writing and reading to bolt persistent storage.
type Store struct {
path string // the path on which to store the db file.
opts *bbolt.Options // options for configuring the boltdb instance.
db *storm.DB // the boltdb instance.
}
// init registers storage structs in gob.
func init() {
//gob.Register(map[string]interface{}{})
}
// New returns a configured instance of the boltdb store.
func New(path string, opts *bbolt.Options) *Store {
if path == "" || path == "." {
path = defaultPath
}
if opts == nil {
opts = &bbolt.Options{
Timeout: defaultTimeout,
}
}
return &Store{
path: path,
opts: opts,
}
}
// Open opens the boltdb instance.
func (s *Store) Open() error {
var err error
s.db, err = storm.Open(s.path, storm.BoltOptions(0600, s.opts), storm.Codec(sgob.Codec))
if err != nil {
return err
}
return nil
}
// Close closes the boltdb instance.
func (s *Store) Close() {
s.db.Close()
}
// StoreSubscriptions writes all subscriptions to the boltdb instance as
// a bulk operation.
func (s *Store) StoreSubscriptions() {
}
// StoreClients writes all clients to the boltdb instance as a bulk operation.
func (s *Store) StoreClients() {
}
// StoreInflight writes all inflight messages to the boltdb instance as a bulk operation.
func (s *Store) StoreInflight() {
}
// StoreInflight writes all inflight messages to the boltdb instance as a bulk operation.
func (s *Store) StoreRetained() {
}
// StoreServerInfo writes the server info to the boltdb instance.
func (s *Store) StoreServerInfo(v persistence.ServerInfo) error {
err := s.db.Save(&v)
if err != nil {
return err
}
return nil
}
// WriteSubscription writes a single subscription to the boltdb instance.
func (s *Store) WriteSubscription(v persistence.Subscription) error {
err := s.db.Save(&v)
if err != nil {
return err
}
return nil
}
// WriteInflight writes a single inflight message to the boltdb instance.
func (s *Store) WriteInflight() {
}
// WriteClient writes a single client to the boltdb instance.
func (s *Store) WriteClient() {
}
// LoadSubscriptions loads all the subscriptions from the boltdb instance.
func (s *Store) LoadSubscriptions() (v []persistence.Subscription, err error) {
err = s.db.Find("T", "subscription", &v)
if err != nil {
return
}
return
}
// LoadClients loads all the clients from the boltdb instance.
func (s *Store) LoadClients() {
}
// LoadInflight loads all the inflight messages from the boltdb instance.
func (s *Store) LoadInflight() {
}
// LoadServerInfo loads the server info from the boltdb instance.
func (s *Store) LoadServerInfo() (v persistence.ServerInfo, err error) {
err = s.db.One("ID", "server_info", &v)
if err != nil {
return
}
return
}

View File

@@ -0,0 +1,101 @@
package bolt
import (
"os"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
"github.com/mochi-co/mqtt/server/persistence"
"github.com/mochi-co/mqtt/server/system"
)
const tmpPath = "testbolt.db"
func teardown(t *testing.T) {
err := os.Remove(tmpPath)
require.NoError(t, err)
}
func TestNew(t *testing.T) {
s := New(tmpPath, &bbolt.Options{
Timeout: 500 * time.Millisecond,
})
require.NotNil(t, s)
require.Equal(t, tmpPath, s.path)
require.Equal(t, 500*time.Millisecond, s.opts.Timeout)
}
func TestNewNoPath(t *testing.T) {
s := New("", nil)
require.NotNil(t, s)
require.Equal(t, defaultPath, s.path)
}
func TestNewNoOpts(t *testing.T) {
s := New("", nil)
require.NotNil(t, s)
require.Equal(t, defaultTimeout, s.opts.Timeout)
}
func TestOpen(t *testing.T) {
s := New(tmpPath, nil)
err := s.Open()
require.NoError(t, err)
defer teardown(t)
require.NotNil(t, s.db)
}
func TestStoreAndRetrieveServerInfo(t *testing.T) {
s := New(tmpPath, nil)
err := s.Open()
require.NoError(t, err)
defer teardown(t)
v := system.Info{
Version: "test",
Started: 100,
}
err = s.StoreServerInfo(persistence.ServerInfo{v, "server_info"})
require.NoError(t, err)
r, err := s.LoadServerInfo()
require.NoError(t, err)
require.NotNil(t, r)
require.Equal(t, v.Version, r.Version)
require.Equal(t, v.Started, r.Started)
}
func TestWriteAndRetrieveSubscription(t *testing.T) {
s := New(tmpPath, nil)
err := s.Open()
require.NoError(t, err)
defer teardown(t)
v := persistence.Subscription{
ID: "test:a/b/c",
Client: "test",
Filter: "a/b/c",
QoS: 1,
T: "subscription",
}
err = s.WriteSubscription(v)
require.NoError(t, err)
v2 := persistence.Subscription{
ID: "test:d/e/f",
Client: "test",
Filter: "d/e/f",
QoS: 2,
T: "subscription",
}
err = s.WriteSubscription(v2)
require.NoError(t, err)
subs, err := s.LoadSubscriptions()
require.NoError(t, err)
require.Equal(t, 2, len(subs))
}

View File

@@ -0,0 +1,160 @@
package persistence
import (
"errors"
"github.com/mochi-co/mqtt/server/system"
)
// Store is an interface which details a persistent storage connector.
type Store interface {
Open() error
Close()
WriteSubscription() // including retained
WriteClient()
WriteInFlight()
WriteRetained()
StoreSubscriptions()
StoreInFlight()
StoreClients()
StoreServerInfo(v ServerInfo) error
ReadSubscriptions() (v []Subscription, err error)
ReadInflight()
ReadClients()
ReadServerInfo()
}
// ServerInfo contains information and statistics about the server.
type ServerInfo struct {
system.Info // embed the system info struct.
ID string // the storage key.
}
// Subscription contains the details of a topic filter subscription.
type Subscription struct {
ID string // the storage key.
T string // the type of the stored data.
Client string // the id of the client who the subscription belongs to.
Filter string // the topic filter being subscribed to.
QoS byte // the desired QoS byte.
}
// Message contains the details of a retained or inflight message.
type Message struct {
ID string // the storage key.
T string // the type of the stored data.
FixedHeader FixedHeader // the header properties of the message.
PacketID uint16 // the unique id of the packet (if inflight).
TopicName string // the topic the message was sent to (if retained).
Payload []byte // the message payload (if retained).
Sent int64 // the last time the message was sent (for retries) in unixtime (if inflight).
Resends int // the number of times the message was attempted to be sent (if inflight).
}
// FixedHeader contains the fixed header properties of a message.
type FixedHeader struct {
Type byte // the type of the packet (PUBLISH, SUBSCRIBE, etc) from bits 7 - 4 (byte 1).
Dup bool // indicates if the packet was already sent at an earlier time.
Qos byte // indicates the quality of service expected.
Retain bool // whether the message should be retained.
Remaining int // the number of remaining bytes in the payload.
}
/*
// Client contains client data that can be persistently stored.
type Client struct {
ID string // the id of the client
Listener string // the last known listener id for the client
Username []byte // the username the client authenticated with.
CleanSession bool // indicates if the client connected expecting a clean-session.
Subscriptions []Subscription // a list of the subscriptions the user has.
LWT LWT // the last-will-and-testament message for the client.
}
// LWT contains details about a clients LWT payload.
type LWT struct {
Topic string // the topic the will message shall be sent to.
Message []byte // the message that shall be sent when the client disconnects.
Qos byte // the quality of service desired.
Retain bool // indicates whether the will message should be retained
}
*/
/*
*
*
*
*/
// MockStore is a mock storage backend for testing.
type MockStore struct {
FailOpen bool
Closed bool
Opened bool
}
// Open opens the storage instance.
func (s *MockStore) Open() error {
if s.FailOpen {
return errors.New("test")
}
s.Opened = true
return nil
}
// Close closes the storage instance.
func (s *MockStore) Close() {
s.Closed = true
}
// StoreSubscriptions writes all subscriptions to the storage instance.
func (s *MockStore) StoreSubscriptions() {
}
// StoreClients writes all clients to the storage instance.
func (s *MockStore) StoreClients() {}
// StoreInFlight writes all Inflight messages to the storage instance.
func (s *MockStore) StoreInflight() {}
// StoreRetained writes all Inflight messages to the storage instance.
func (s *MockStore) StoreRetained() {}
// StoreServerInfo writes the server info to the storage instance.
func (s *MockStore) StoreServerInfo(v ServerInfo) {
}
// WriteSubscription writes a single subscription to the storage instance.
func (s *MockStore) WriteSubscription() {}
// WriteClient writes a single client to the storage instance.
func (s *MockStore) WriteClient() {}
// WriteInFlight writes a single InFlight message to the storage instance.
func (s *MockStore) WriteInflight() {}
// WriteRetained writes a single retained message to the storage instance.
func (s *MockStore) WriteRetained() {}
// LoadSubscriptions loads the subscriptions from the storage instance.
func (s *MockStore) LoadSubscriptions() (v []Subscription, err error) {
return
}
// LoadClients loads the clients from the storage instance.
func (s *MockStore) LoadClients() {}
// LoadInflight loads the inflight messages from the storage instance.
func (s *MockStore) LoadInflight() {}
// LoadServerInfo loads the server info from the storage instance.
func (s *MockStore) LoadServerInfo() (v ServerInfo, err error) {
return
}

View File

@@ -0,0 +1,27 @@
package persistence
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestMockStoreOpen(t *testing.T) {
s := new(MockStore)
err := s.Open()
require.NoError(t, err)
require.Equal(t, true, s.Opened)
}
func TestMockStoreOpenFail(t *testing.T) {
s := new(MockStore)
s.FailOpen = true
err := s.Open()
require.Error(t, err)
}
func TestMockStoreClose(t *testing.T) {
s := new(MockStore)
s.Close()
require.Equal(t, true, s.Closed)
}

View File

@@ -14,6 +14,7 @@ import (
"github.com/mochi-co/mqtt/server/internal/topics"
"github.com/mochi-co/mqtt/server/listeners"
"github.com/mochi-co/mqtt/server/listeners/auth"
"github.com/mochi-co/mqtt/server/persistence"
"github.com/mochi-co/mqtt/server/system"
)
@@ -40,6 +41,7 @@ type Server struct {
Clients *clients.Clients // clients known to the broker.
Topics *topics.Index // an index of topic subscriptions and retained messages.
System *system.Info // values commonly found in $SYS topics.
Stores []persistence.Store // a slice of persistent storage backends.
sysTicker *time.Ticker // the interval ticker for sending updating $SYS topics.
}
@@ -54,6 +56,7 @@ func New() *Server {
Version: Version,
Started: time.Now().Unix(),
},
Stores: make([]persistence.Store, 0, 1),
sysTicker: time.NewTicker(SysTopicInterval * time.Millisecond),
}
@@ -64,6 +67,24 @@ func New() *Server {
return s
}
// AddStore adds a persistent storage backend to the server.
func (s *Server) AddStore(p persistence.Store) error {
err := p.Open()
if err != nil {
return err
}
s.Stores = append(s.Stores, p)
return nil
}
// CloseStores closes down all storage backends.
func (s *Server) CloseStores() {
for _, v := range s.Stores {
v.Close()
}
}
// AddListener adds a new network listener to the server.
func (s *Server) AddListener(listener listeners.Listener, config *listeners.Config) error {
if _, ok := s.Listeners.Get(listener.ID()); ok {
@@ -513,10 +534,11 @@ func (s *Server) publishSysTopics() {
}
// Close attempts to gracefully shutdown the server, all listeners, and clients.
// Close attempts to gracefully shutdown the server, all listeners, clients, and stores.
func (s *Server) Close() error {
close(s.done)
s.Listeners.CloseAll(s.closeListenerClients)
s.CloseStores()
return nil
}

View File

@@ -16,6 +16,7 @@ import (
"github.com/mochi-co/mqtt/server/internal/topics"
"github.com/mochi-co/mqtt/server/listeners"
"github.com/mochi-co/mqtt/server/listeners/auth"
"github.com/mochi-co/mqtt/server/persistence"
)
const defaultPort = ":18882"
@@ -36,6 +37,7 @@ func TestNew(t *testing.T) {
require.NotNil(t, s.Listeners)
require.NotNil(t, s.Clients)
require.NotNil(t, s.Topics)
require.NotNil(t, s.Stores)
require.NotEmpty(t, s.System.Version)
require.Equal(t, true, s.System.Started > 0)
}
@@ -46,6 +48,58 @@ func BenchmarkNew(b *testing.B) {
}
}
func TestServerAddStore(t *testing.T) {
s := New()
require.NotNil(t, s)
p := new(persistence.MockStore)
err := s.AddStore(p)
require.NoError(t, err)
require.Equal(t, 1, len(s.Stores))
require.Equal(t, p, s.Stores[0])
}
func TestServerAddStoreFailure(t *testing.T) {
s := New()
require.NotNil(t, s)
p := new(persistence.MockStore)
p.FailOpen = true
err := s.AddStore(p)
require.Error(t, err)
require.Equal(t, 0, len(s.Stores))
}
func BenchmarkServerAddStore(b *testing.B) {
s := New()
p := new(persistence.MockStore)
for n := 0; n < b.N; n++ {
s.AddStore(p)
}
}
func TestServerCloseStores(t *testing.T) {
s := New()
require.NotNil(t, s)
p := new(persistence.MockStore)
err := s.AddStore(p)
require.NoError(t, err)
p2 := new(persistence.MockStore)
err = s.AddStore(p2)
require.NoError(t, err)
require.Equal(t, false, p.Closed)
require.Equal(t, false, p2.Closed)
s.CloseStores()
require.Equal(t, true, p.Closed)
require.Equal(t, true, p2.Closed)
}
func TestServerAddListener(t *testing.T) {
s := New()
require.NotNil(t, s)
@@ -1109,7 +1163,11 @@ func TestServerClose(t *testing.T) {
cl.Listener = "t1"
s.Clients.Add(cl)
err := s.AddListener(listeners.NewMockListener("t1", ":1882"), nil)
p := new(persistence.MockStore)
err := s.AddStore(p)
require.NoError(t, err)
err = s.AddListener(listeners.NewMockListener("t1", ":1882"), nil)
require.NoError(t, err)
s.Serve()
time.Sleep(time.Millisecond)
@@ -1122,6 +1180,7 @@ func TestServerClose(t *testing.T) {
s.Close()
time.Sleep(time.Millisecond)
require.Equal(t, false, listener.(*listeners.MockListener).IsServing)
require.Equal(t, true, p.Closed)
}
func TestServerCloseClientLWT(t *testing.T) {