From ed7fd836e17a3786c7fd2e7e79a51a520f9a6e64 Mon Sep 17 00:00:00 2001 From: werben Date: Sat, 22 Apr 2023 04:52:44 +0800 Subject: [PATCH] #78 storage hook should not execute the relevant code if the client has been reconnected (#198) * storage hook should not execute the relevant code if the client has been reconnected #78 * add test cases for coverage decrease add test cases for coverage decrease --- hooks/storage/badger/badger.go | 4 ++++ hooks/storage/badger/badger_test.go | 23 +++++++++++++++++++++++ hooks/storage/bolt/bolt.go | 4 ++++ hooks/storage/bolt/bolt_test.go | 23 +++++++++++++++++++++++ hooks/storage/redis/redis.go | 4 ++++ hooks/storage/redis/redis_test.go | 22 ++++++++++++++++++++++ 6 files changed, 80 insertions(+) diff --git a/hooks/storage/badger/badger.go b/hooks/storage/badger/badger.go index 1bc05cd..df2ab67 100644 --- a/hooks/storage/badger/badger.go +++ b/hooks/storage/badger/badger.go @@ -182,6 +182,10 @@ func (h *Hook) OnDisconnect(cl *mqtt.Client, _ error, expire bool) { return } + if cl.StopCause() == packets.ErrSessionTakenOver { + return + } + err := h.db.Delete(clientKey(cl), new(storage.Client)) if err != nil { h.Log.Error().Err(err).Interface("data", clientKey(cl)).Msg("failed to delete client data") diff --git a/hooks/storage/badger/badger_test.go b/hooks/storage/badger/badger_test.go index 2366468..06794be 100644 --- a/hooks/storage/badger/badger_test.go +++ b/hooks/storage/badger/badger_test.go @@ -232,6 +232,29 @@ func TestOnDisconnectClosedDB(t *testing.T) { h.OnDisconnect(client, nil, false) } +func TestOnDisconnectSessionTakenOver(t *testing.T) { + h := new(Hook) + h.SetOpts(&logger, nil) + err := h.Init(nil) + require.NoError(t, err) + + testClient := &mqtt.Client{ + ID: "test", + Net: mqtt.ClientConnection{ + Remote: "test.addr", + Listener: "listener", + }, + Properties: mqtt.ClientProperties{ + Username: []byte("username"), + Clean: false, + }, + } + + testClient.Stop(packets.ErrSessionTakenOver) + teardown(t, h.config.Path, h) + h.OnDisconnect(testClient, nil, true) +} + func TestOnSubscribedThenOnUnsubscribed(t *testing.T) { h := new(Hook) h.SetOpts(&logger, nil) diff --git a/hooks/storage/bolt/bolt.go b/hooks/storage/bolt/bolt.go index 3dc9345..3de6b4e 100644 --- a/hooks/storage/bolt/bolt.go +++ b/hooks/storage/bolt/bolt.go @@ -184,6 +184,10 @@ func (h *Hook) OnDisconnect(cl *mqtt.Client, _ error, expire bool) { return } + if cl.StopCause() == packets.ErrSessionTakenOver { + return + } + err := h.db.DeleteStruct(&storage.Client{ID: clientKey(cl)}) if err != nil && !errors.Is(err, storm.ErrNotFound) { h.Log.Error().Err(err).Str("id", clientKey(cl)).Msg("failed to delete client") diff --git a/hooks/storage/bolt/bolt_test.go b/hooks/storage/bolt/bolt_test.go index c491a5d..57da7e4 100644 --- a/hooks/storage/bolt/bolt_test.go +++ b/hooks/storage/bolt/bolt_test.go @@ -241,6 +241,29 @@ func TestOnDisconnectClosedDB(t *testing.T) { h.OnDisconnect(client, nil, false) } +func TestOnDisconnectSessionTakenOver(t *testing.T) { + h := new(Hook) + h.SetOpts(&logger, nil) + err := h.Init(nil) + require.NoError(t, err) + + testClient := &mqtt.Client{ + ID: "test", + Net: mqtt.ClientConnection{ + Remote: "test.addr", + Listener: "listener", + }, + Properties: mqtt.ClientProperties{ + Username: []byte("username"), + Clean: false, + }, + } + + testClient.Stop(packets.ErrSessionTakenOver) + teardown(t, h.config.Path, h) + h.OnDisconnect(testClient, nil, true) +} + func TestOnSubscribedThenOnUnsubscribed(t *testing.T) { h := new(Hook) h.SetOpts(&logger, nil) diff --git a/hooks/storage/redis/redis.go b/hooks/storage/redis/redis.go index 0df8edc..574c582 100644 --- a/hooks/storage/redis/redis.go +++ b/hooks/storage/redis/redis.go @@ -199,6 +199,10 @@ func (h *Hook) OnDisconnect(cl *mqtt.Client, _ error, expire bool) { return } + if cl.StopCause() == packets.ErrSessionTakenOver { + return + } + err := h.db.HDel(h.ctx, h.hKey(storage.ClientKey), clientKey(cl)).Err() if err != nil { h.Log.Error().Err(err).Str("id", clientKey(cl)).Msg("failed to delete client") diff --git a/hooks/storage/redis/redis_test.go b/hooks/storage/redis/redis_test.go index 0d8fbcb..bcbd0cb 100644 --- a/hooks/storage/redis/redis_test.go +++ b/hooks/storage/redis/redis_test.go @@ -285,6 +285,28 @@ func TestOnDisconnectClosedDB(t *testing.T) { h.OnDisconnect(client, nil, false) } +func TestOnDisconnectSessionTakenOver(t *testing.T) { + s := miniredis.RunT(t) + defer s.Close() + h := newHook(t, s.Addr()) + + testClient := &mqtt.Client{ + ID: "test", + Net: mqtt.ClientConnection{ + Remote: "test.addr", + Listener: "listener", + }, + Properties: mqtt.ClientProperties{ + Username: []byte("username"), + Clean: false, + }, + } + + testClient.Stop(packets.ErrSessionTakenOver) + teardown(t, h) + h.OnDisconnect(testClient, nil, true) +} + func TestOnSubscribedThenOnUnsubscribed(t *testing.T) { s := miniredis.RunT(t) defer s.Close()