diff --git a/server/internal/clients/clients.go b/server/internal/clients/clients.go index ce7d486..95264e1 100644 --- a/server/internal/clients/clients.go +++ b/server/internal/clients/clients.go @@ -138,6 +138,17 @@ func NewClient(c net.Conn, r *circ.Reader, w *circ.Writer, s *system.Info) *Clie return cl } +// NewClientStub returns an instance of Client with basic initializations. This +// method is typically called by the persistence restoration system. +func NewClientStub(s *system.Info) *Client { + return &Client{ + Inflight: Inflight{ + internal: make(map[uint16]InflightMessage), + }, + Subscriptions: make(map[string]byte), + } +} + // Identify sets the identification values of a client instance. func (cl *Client) Identify(lid string, pk packets.Packet, ac auth.Controller) { cl.Listener = lid diff --git a/server/internal/clients/clients_test.go b/server/internal/clients/clients_test.go index bc955d1..08c233d 100644 --- a/server/internal/clients/clients_test.go +++ b/server/internal/clients/clients_test.go @@ -144,6 +144,20 @@ func BenchmarkNewClient(b *testing.B) { } } +func TestNewClientStub(t *testing.T) { + cl := NewClientStub(nil) + + require.NotNil(t, cl) + require.NotNil(t, cl.Inflight.internal) + require.NotNil(t, cl.Subscriptions) +} + +func BenchmarkNewClientStub(b *testing.B) { + for n := 0; n < b.N; n++ { + NewClientStub(nil) + } +} + func TestClientIdentify(t *testing.T) { cl := genClient() diff --git a/server/persistence/bolt/bolt.go b/server/persistence/bolt/bolt.go index 571a487..0f1843c 100644 --- a/server/persistence/bolt/bolt.go +++ b/server/persistence/bolt/bolt.go @@ -123,6 +123,70 @@ func (s *Store) WriteClient(v persistence.Client) error { return nil } +// DeleteSubscription deletes a subscription from the boltdb instance. +func (s *Store) DeleteSubscription(id string) error { + if s.db == nil { + return errors.New("boltdb not opened") + } + + err := s.db.DeleteStruct(&persistence.Subscription{ + ID: id, + }) + if err != nil { + return err + } + + return nil +} + +// DeleteClient deletes a client from the boltdb instance. +func (s *Store) DeleteClient(id string) error { + if s.db == nil { + return errors.New("boltdb not opened") + } + + err := s.db.DeleteStruct(&persistence.Client{ + ID: id, + }) + if err != nil { + return err + } + + return nil +} + +// DeleteInflight deletes an inflight message from the boltdb instance. +func (s *Store) DeleteInflight(id string) error { + if s.db == nil { + return errors.New("boltdb not opened") + } + + err := s.db.DeleteStruct(&persistence.Message{ + ID: id, + }) + if err != nil { + return err + } + + return nil +} + +// DeleteRetained deletes a retained message from the boltdb instance. +func (s *Store) DeleteRetained(id string) error { + if s.db == nil { + return errors.New("boltdb not opened") + } + + err := s.db.DeleteStruct(&persistence.Message{ + ID: id, + }) + if err != nil { + return err + } + + return nil +} + // ReadSubscriptions loads all the subscriptions from the boltdb instance. func (s *Store) ReadSubscriptions() (v []persistence.Subscription, err error) { if s.db == nil { @@ -134,6 +198,7 @@ func (s *Store) ReadSubscriptions() (v []persistence.Subscription, err error) { if err != nil { return } + return } @@ -148,6 +213,7 @@ func (s *Store) ReadClients() (v []persistence.Client, err error) { if err != nil { return } + return } @@ -162,6 +228,7 @@ func (s *Store) ReadInflight() (v []persistence.Message, err error) { if err != nil { return } + return } @@ -176,6 +243,7 @@ func (s *Store) ReadRetained() (v []persistence.Message, err error) { if err != nil { return } + return } @@ -190,5 +258,6 @@ func (s *Store) ReadServerInfo() (v persistence.ServerInfo, err error) { if err != nil { return } + return } diff --git a/server/persistence/bolt/bolt_test.go b/server/persistence/bolt/bolt_test.go index b530ebe..6df5026 100644 --- a/server/persistence/bolt/bolt_test.go +++ b/server/persistence/bolt/bolt_test.go @@ -119,7 +119,7 @@ func TestReadServerInfoFail(t *testing.T) { require.Error(t, err) } -func TestWriteAndRetrieveSubscription(t *testing.T) { +func TestWriteRetrieveDeleteSubscription(t *testing.T) { s := New(tmpPath, nil) err := s.Open() require.NoError(t, err) @@ -149,6 +149,13 @@ func TestWriteAndRetrieveSubscription(t *testing.T) { require.NoError(t, err) require.Equal(t, persistence.KSubscription, subs[0].T) require.Equal(t, 2, len(subs)) + + err = s.DeleteSubscription("test:d/e/f") + require.NoError(t, err) + + subs, err = s.ReadSubscriptions() + require.NoError(t, err) + require.Equal(t, 1, len(subs)) } func TestWriteSubscriptionNoDB(t *testing.T) { @@ -187,7 +194,7 @@ func TestReadSubscriptionFail(t *testing.T) { require.Error(t, err) } -func TestWriteAndRetrieveInflight(t *testing.T) { +func TestWriteRetrieveDeleteInflight(t *testing.T) { s := New(tmpPath, nil) err := s.Open() require.NoError(t, err) @@ -221,6 +228,14 @@ func TestWriteAndRetrieveInflight(t *testing.T) { require.NoError(t, err) require.Equal(t, persistence.KInflight, msgs[0].T) require.Equal(t, 2, len(msgs)) + + err = s.DeleteInflight("client1_if_100") + require.NoError(t, err) + + msgs, err = s.ReadInflight() + require.NoError(t, err) + require.Equal(t, 1, len(msgs)) + } func TestWriteInflightNoDB(t *testing.T) { @@ -259,7 +274,7 @@ func TestReadInflightFail(t *testing.T) { require.Error(t, err) } -func TestWriteAndRetrieveRetained(t *testing.T) { +func TestWriteRetrieveDeleteRetained(t *testing.T) { s := New(tmpPath, nil) err := s.Open() require.NoError(t, err) @@ -300,6 +315,13 @@ func TestWriteAndRetrieveRetained(t *testing.T) { require.Equal(t, persistence.KRetained, msgs[0].T) require.Equal(t, true, msgs[0].FixedHeader.Retain) require.Equal(t, 2, len(msgs)) + + err = s.DeleteRetained("client1_ret_300") + require.NoError(t, err) + + msgs, err = s.ReadRetained() + require.NoError(t, err) + require.Equal(t, 1, len(msgs)) } func TestWriteRetainedNoDB(t *testing.T) { @@ -338,7 +360,7 @@ func TestReadRetainedFail(t *testing.T) { require.Error(t, err) } -func TestWriteAndRetrieveClients(t *testing.T) { +func TestWriteRetrieveDeleteClients(t *testing.T) { s := New(tmpPath, nil) err := s.Open() require.NoError(t, err) @@ -383,6 +405,13 @@ func TestWriteAndRetrieveClients(t *testing.T) { require.NoError(t, err) require.Equal(t, persistence.KClient, clients[0].T) require.Equal(t, 2, len(clients)) + + err = s.DeleteClient("client2") + require.NoError(t, err) + + clients, err = s.ReadClients() + require.NoError(t, err) + require.Equal(t, 1, len(clients)) } func TestWriteClientNoDB(t *testing.T) { @@ -420,3 +449,75 @@ func TestReadClientFail(t *testing.T) { _, err = s.ReadClients() require.Error(t, err) } + +func TestDeleteSubscriptionNoDB(t *testing.T) { + s := New(tmpPath, nil) + err := s.DeleteSubscription("a") + require.Error(t, err) +} + +func TestDeleteSubscriptionFail(t *testing.T) { + s := New(tmpPath, nil) + err := s.Open() + require.NoError(t, err) + + err = os.Remove(tmpPath) + require.NoError(t, err) + + err = s.DeleteSubscription("a") + require.Error(t, err) +} + +func TestDeleteClientNoDB(t *testing.T) { + s := New(tmpPath, nil) + err := s.DeleteClient("a") + require.Error(t, err) +} + +func TestDeleteClientFail(t *testing.T) { + s := New(tmpPath, nil) + err := s.Open() + require.NoError(t, err) + + err = os.Remove(tmpPath) + require.NoError(t, err) + + err = s.DeleteClient("a") + require.Error(t, err) +} + +func TestDeleteInflightNoDB(t *testing.T) { + s := New(tmpPath, nil) + err := s.DeleteInflight("a") + require.Error(t, err) +} + +func TestDeleteInflightFail(t *testing.T) { + s := New(tmpPath, nil) + err := s.Open() + require.NoError(t, err) + + err = os.Remove(tmpPath) + require.NoError(t, err) + + err = s.DeleteInflight("a") + require.Error(t, err) +} + +func TestDeleteRetainedNoDB(t *testing.T) { + s := New(tmpPath, nil) + err := s.DeleteRetained("a") + require.Error(t, err) +} + +func TestDeleteRetainedFail(t *testing.T) { + s := New(tmpPath, nil) + err := s.Open() + require.NoError(t, err) + + err = os.Remove(tmpPath) + require.NoError(t, err) + + err = s.DeleteRetained("a") + require.Error(t, err) +} diff --git a/server/persistence/persistence.go b/server/persistence/persistence.go index fd9c5f0..c5e994c 100644 --- a/server/persistence/persistence.go +++ b/server/persistence/persistence.go @@ -23,6 +23,12 @@ type Store interface { WriteInflight(v Message) error WriteServerInfo(v ServerInfo) error WriteRetained(v Message) error + + DeleteSubscription(id string) error + DeleteClient(id string) error + DeleteInflight(id string) error + DeleteRetained(id string) error + ReadSubscriptions() (v []Subscription, err error) ReadInflight() (v []Message, err error) ReadRetained() (v []Message, err error) @@ -91,7 +97,7 @@ type MockStore struct { FailOpen bool Closed bool Opened bool - Fail bool + Fail map[string]bool } // Open opens the storage instance. @@ -111,7 +117,7 @@ func (s *MockStore) Close() { // WriteSubscription writes a single subscription to the storage instance. func (s *MockStore) WriteSubscription(v Subscription) error { - if s.Fail { + if _, ok := s.Fail["write_subs"]; ok { return errors.New("test") } return nil @@ -119,7 +125,7 @@ func (s *MockStore) WriteSubscription(v Subscription) error { // WriteClient writes a single client to the storage instance. func (s *MockStore) WriteClient(v Client) error { - if s.Fail { + if _, ok := s.Fail["write_clients"]; ok { return errors.New("test") } return nil @@ -127,7 +133,7 @@ func (s *MockStore) WriteClient(v Client) error { // WriteInFlight writes a single InFlight message to the storage instance. func (s *MockStore) WriteInflight(v Message) error { - if s.Fail { + if _, ok := s.Fail["write_inflight"]; ok { return errors.New("test") } return nil @@ -135,7 +141,7 @@ func (s *MockStore) WriteInflight(v Message) error { // WriteRetained writes a single retained message to the storage instance. func (s *MockStore) WriteRetained(v Message) error { - if s.Fail { + if _, ok := s.Fail["write_retained"]; ok { return errors.New("test") } return nil @@ -143,52 +149,137 @@ func (s *MockStore) WriteRetained(v Message) error { // WriteServerInfo writes server info to the storage instance. func (s *MockStore) WriteServerInfo(v ServerInfo) error { - if s.Fail { + if _, ok := s.Fail["write_info"]; ok { return errors.New("test") } return nil } +// DeleteSubscription deletes a subscription from the persistent store. +func (s *MockStore) DeleteSubscription(id string) error { + if _, ok := s.Fail["delete_subs"]; ok { + return errors.New("test") + } + + return nil +} + +// DeleteClient deletes a client from the persistent store. +func (s *MockStore) DeleteClient(id string) error { + if _, ok := s.Fail["delete_clients"]; ok { + return errors.New("test") + } + + return nil +} + +// DeleteInflight deletes an inflight message from the persistent store. +func (s *MockStore) DeleteInflight(id string) error { + if _, ok := s.Fail["delete_inflight"]; ok { + return errors.New("test") + } + + return nil +} + +// DeleteRetained deletes a retained message from the persistent store. +func (s *MockStore) DeleteRetained(id string) error { + if _, ok := s.Fail["delete_retained"]; ok { + return errors.New("test") + } + + return nil +} + // ReadSubscriptions loads the subscriptions from the storage instance. func (s *MockStore) ReadSubscriptions() (v []Subscription, err error) { - if s.Fail { - return v, errors.New("test") + if _, ok := s.Fail["read_subs"]; ok { + return v, errors.New("test_subs") } - return + + return []Subscription{ + Subscription{ + ID: "test:a/b/c", + Client: "test", + Filter: "a/b/c", + QoS: 1, + T: KSubscription, + }, + }, nil } // ReadClients loads the clients from the storage instance. func (s *MockStore) ReadClients() (v []Client, err error) { - if s.Fail { - return v, errors.New("test") + if _, ok := s.Fail["read_clients"]; ok { + return v, errors.New("test_clients") } - return + + return []Client{ + Client{ + ID: "client1", + T: KClient, + Listener: "tcp1", + CleanSession: true, + Subscriptions: map[string]byte{ + "a/b/c": 0, + "d/e/f": 1, + }, + }, + }, nil } // ReadInflight loads the inflight messages from the storage instance. func (s *MockStore) ReadInflight() (v []Message, err error) { - if s.Fail { - return v, errors.New("test") + if _, ok := s.Fail["read_inflight"]; ok { + return v, errors.New("test_inflight") } - return + + return []Message{ + Message{ + ID: "client1_if_100", + T: KInflight, + Client: "client1", + PacketID: 100, + TopicName: "d/e/f", + Payload: []byte{'y', 'e', 's'}, + Sent: 200, + Resends: 1, + }, + }, nil } // ReadRetained loads the retained messages from the storage instance. func (s *MockStore) ReadRetained() (v []Message, err error) { - if s.Fail { - return v, errors.New("test") + if _, ok := s.Fail["read_retained"]; ok { + return v, errors.New("test_retained") } - return + + return []Message{ + Message{ + ID: "client1_ret_200", + T: KRetained, + FixedHeader: FixedHeader{ + Retain: true, + }, + PacketID: 200, + TopicName: "a/b/c", + Payload: []byte{'h', 'e', 'l', 'l', 'o'}, + Sent: 100, + Resends: 0, + }, + }, nil } //ReadServerInfo loads the server info from the storage instance. func (s *MockStore) ReadServerInfo() (v ServerInfo, err error) { - if s.Fail { - return v, errors.New("test") + if _, ok := s.Fail["read_info"]; ok { + return v, errors.New("test_info") } + return ServerInfo{ system.Info{ Version: "test", + Started: 100, }, KServerInfo, }, nil diff --git a/server/persistence/persistence_test.go b/server/persistence/persistence_test.go index 5297242..00fd02d 100644 --- a/server/persistence/persistence_test.go +++ b/server/persistence/persistence_test.go @@ -33,8 +33,11 @@ func TestMockStoreWriteSubscription(t *testing.T) { } func TestMockStoreWriteSubscriptionFail(t *testing.T) { - s := new(MockStore) - s.Fail = true + s := &MockStore{ + Fail: map[string]bool{ + "write_subs": true, + }, + } err := s.WriteSubscription(Subscription{}) require.Error(t, err) } @@ -46,8 +49,11 @@ func TestMockStoreWriteClient(t *testing.T) { } func TestMockStoreWriteClientFail(t *testing.T) { - s := new(MockStore) - s.Fail = true + s := &MockStore{ + Fail: map[string]bool{ + "write_clients": true, + }, + } err := s.WriteClient(Client{}) require.Error(t, err) } @@ -59,8 +65,11 @@ func TestMockStoreWriteInflight(t *testing.T) { } func TestMockStoreWriteInflightFail(t *testing.T) { - s := new(MockStore) - s.Fail = true + s := &MockStore{ + Fail: map[string]bool{ + "write_inflight": true, + }, + } err := s.WriteInflight(Message{}) require.Error(t, err) } @@ -72,8 +81,11 @@ func TestMockStoreWriteRetained(t *testing.T) { } func TestMockStoreWriteRetainedFail(t *testing.T) { - s := new(MockStore) - s.Fail = true + s := &MockStore{ + Fail: map[string]bool{ + "write_retained": true, + }, + } err := s.WriteRetained(Message{}) require.Error(t, err) } @@ -85,12 +97,79 @@ func TestMockStoreWriteServerInfo(t *testing.T) { } func TestMockStoreWriteServerInfoFail(t *testing.T) { - s := new(MockStore) - s.Fail = true + s := &MockStore{ + Fail: map[string]bool{ + "write_info": true, + }, + } err := s.WriteServerInfo(ServerInfo{}) require.Error(t, err) } +func TestMockStoreDeleteSubscription(t *testing.T) { + s := new(MockStore) + err := s.DeleteSubscription("client1:d/e/f") + require.NoError(t, err) +} + +func TestMockStoreDeleteSubscriptionFail(t *testing.T) { + s := &MockStore{ + Fail: map[string]bool{ + "delete_subs": true, + }, + } + err := s.DeleteSubscription("client1:a/b/c") + require.Error(t, err) +} + +func TestMockStoreDeleteClient(t *testing.T) { + s := new(MockStore) + err := s.DeleteClient("client1") + require.NoError(t, err) +} + +func TestMockStoreDeleteClientFail(t *testing.T) { + s := &MockStore{ + Fail: map[string]bool{ + "delete_clients": true, + }, + } + err := s.DeleteClient("client1") + require.Error(t, err) +} + +func TestMockStoreDeleteInflight(t *testing.T) { + s := new(MockStore) + err := s.DeleteInflight("client1-if-100") + require.NoError(t, err) +} + +func TestMockStoreDeleteInflightFail(t *testing.T) { + s := &MockStore{ + Fail: map[string]bool{ + "delete_inflight": true, + }, + } + err := s.DeleteInflight("client1-if-100") + require.Error(t, err) +} + +func TestMockStoreDeleteRetained(t *testing.T) { + s := new(MockStore) + err := s.DeleteRetained("client1-ret-100") + require.NoError(t, err) +} + +func TestMockStoreDeleteRetainedFail(t *testing.T) { + s := &MockStore{ + Fail: map[string]bool{ + "delete_retained": true, + }, + } + err := s.DeleteRetained("client1-ret-100") + require.Error(t, err) +} + func TestMockStorReadServerInfo(t *testing.T) { s := new(MockStore) _, err := s.ReadServerInfo() @@ -98,8 +177,11 @@ func TestMockStorReadServerInfo(t *testing.T) { } func TestMockStorReadServerInfoFail(t *testing.T) { - s := new(MockStore) - s.Fail = true + s := &MockStore{ + Fail: map[string]bool{ + "read_info": true, + }, + } _, err := s.ReadServerInfo() require.Error(t, err) } @@ -111,8 +193,11 @@ func TestMockStoreReadSubscriptions(t *testing.T) { } func TestMockStoreReadSubscriptionsFail(t *testing.T) { - s := new(MockStore) - s.Fail = true + s := &MockStore{ + Fail: map[string]bool{ + "read_subs": true, + }, + } _, err := s.ReadSubscriptions() require.Error(t, err) } @@ -124,8 +209,11 @@ func TestMockStoreReadClients(t *testing.T) { } func TestMockStoreReadClientsFail(t *testing.T) { - s := new(MockStore) - s.Fail = true + s := &MockStore{ + Fail: map[string]bool{ + "read_clients": true, + }, + } _, err := s.ReadClients() require.Error(t, err) } @@ -137,8 +225,11 @@ func TestMockStoreReadInflight(t *testing.T) { } func TestMockStoreReadInflightFail(t *testing.T) { - s := new(MockStore) - s.Fail = true + s := &MockStore{ + Fail: map[string]bool{ + "read_inflight": true, + }, + } _, err := s.ReadInflight() require.Error(t, err) } @@ -150,8 +241,11 @@ func TestMockStoreReadRetained(t *testing.T) { } func TestMockStoreReadRetainedFail(t *testing.T) { - s := new(MockStore) - s.Fail = true + s := &MockStore{ + Fail: map[string]bool{ + "read_retained": true, + }, + } _, err := s.ReadRetained() require.Error(t, err) } diff --git a/server/server.go b/server/server.go index 6f53c8a..baa2aaf 100644 --- a/server/server.go +++ b/server/server.go @@ -165,13 +165,14 @@ func (s *Server) loadSubscriptions(v []persistence.Subscription) { // loadClients restores clients from the datastore. func (s *Server) loadClients(v []persistence.Client) { for _, cl := range v { - s.Clients.Add(&clients.Client{ - ID: cl.ID, - Listener: cl.Listener, - Username: cl.Username, - LWT: clients.LWT(cl.LWT), - Subscriptions: cl.Subscriptions, - }) + + c := clients.NewClientStub(s.System) + c.ID = cl.ID + c.Listener = cl.Listener + c.Username = cl.Username + c.LWT = clients.LWT(cl.LWT) + c.Subscriptions = cl.Subscriptions + s.Clients.Add(c) } } @@ -320,9 +321,6 @@ func (s *Server) writeClient(cl *clients.Client, pk packets.Packet) error { return err } - // Log $SYS stats. - // @TODO ... - return nil } diff --git a/server/server_test.go b/server/server_test.go index 61194ee..ec96bd7 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -1,6 +1,7 @@ package server import ( + "errors" "io/ioutil" "net" "strconv" @@ -124,19 +125,58 @@ func TestServerReadStore(t *testing.T) { require.NotNil(t, s) s.Store = new(persistence.MockStore) - err := s.readStore() require.NoError(t, err) + + require.Equal(t, int64(100), s.System.Started) + require.Equal(t, topics.Subscriptions{"test": 1}, s.Topics.Subscribers("a/b/c")) + + cl1, ok := s.Clients.Get("client1") + require.Equal(t, true, ok) + + msg, ok := cl1.Inflight.Get(100) + require.Equal(t, true, ok) + require.Equal(t, []byte{'y', 'e', 's'}, msg.Packet.Payload) + } -func TestServerReadStoreFailure(t *testing.T) { +func TestServerReadStoreFailures(t *testing.T) { s := New() require.NotNil(t, s) s.Store = new(persistence.MockStore) + s.Store.(*persistence.MockStore).Fail = map[string]bool{ + "read_subs": true, + "read_clients": true, + "read_inflight": true, + "read_retained": true, + "read_info": true, + } err := s.readStore() - require.NoError(t, err) + require.Error(t, err) + require.Equal(t, errors.New("test_info"), err) + delete(s.Store.(*persistence.MockStore).Fail, "read_info") + + err = s.readStore() + require.Error(t, err) + require.Equal(t, errors.New("test_subs"), err) + delete(s.Store.(*persistence.MockStore).Fail, "read_subs") + + err = s.readStore() + require.Error(t, err) + require.Equal(t, errors.New("test_clients"), err) + delete(s.Store.(*persistence.MockStore).Fail, "read_clients") + + err = s.readStore() + require.Error(t, err) + require.Equal(t, errors.New("test_inflight"), err) + delete(s.Store.(*persistence.MockStore).Fail, "read_inflight") + + err = s.readStore() + require.Error(t, err) + require.Equal(t, errors.New("test_retained"), err) + delete(s.Store.(*persistence.MockStore).Fail, "read_retained") } func TestServerLoadServerInfo(t *testing.T) { @@ -315,7 +355,8 @@ func TestServerServe(t *testing.T) { err := s.AddListener(listeners.NewMockListener("t1", ":1882"), nil) require.NoError(t, err) - s.Serve() + err = s.Serve() + require.NoError(t, err) time.Sleep(time.Millisecond) require.Equal(t, 1, s.Listeners.Len()) listener, ok := s.Listeners.Get("t1") @@ -323,6 +364,17 @@ func TestServerServe(t *testing.T) { require.Equal(t, true, listener.(*listeners.MockListener).IsServing) } +func TestServerServeFail(t *testing.T) { + s := New() + s.Store = new(persistence.MockStore) + s.Store.(*persistence.MockStore).Fail = map[string]bool{ + "read_subs": true, + } + + err := s.Serve() + require.Error(t, err) +} + func BenchmarkServerServe(b *testing.B) { s := New() l := listeners.NewMockListener("t1", ":1882")