mirror of
https://github.com/mochi-mqtt/server.git
synced 2025-09-26 20:21:12 +08:00
Add retainMessage to LWT to properly handle message retention (#234)
* Add retainMessage to LWT to properly handle message retention if specified in connect * Add will retain flag on missed test --------- Co-authored-by: Derek Duncan <derekduncan@gmail.com>
This commit is contained in:
@@ -1285,6 +1285,10 @@ func (s *Server) sendLWT(cl *Client) {
|
||||
return
|
||||
}
|
||||
|
||||
if pk.FixedHeader.Retain {
|
||||
s.retainMessage(cl, pk)
|
||||
}
|
||||
|
||||
s.publishToSubscribers(pk) // [MQTT-3.1.2-8]
|
||||
atomic.StoreUint32(&cl.Properties.Will.Flag, 0) // [MQTT-3.1.2-10]
|
||||
s.hooks.OnWillSent(cl, pk)
|
||||
@@ -1477,6 +1481,9 @@ func (s *Server) sendDelayedLWT(dt int64) {
|
||||
if dt > pk.Expiry {
|
||||
s.publishToSubscribers(pk) // [MQTT-3.1.2-8]
|
||||
if cl, ok := s.Clients.Get(id); ok {
|
||||
if pk.FixedHeader.Retain {
|
||||
s.retainMessage(cl, pk)
|
||||
}
|
||||
cl.Properties.Will = Will{} // [MQTT-3.1.2-10]
|
||||
s.hooks.OnWillSent(cl, pk)
|
||||
}
|
||||
|
@@ -2586,6 +2586,46 @@ func TestServerSendLWT(t *testing.T) {
|
||||
require.Equal(t, packets.TPacketData[packets.Publish].Get(packets.TPublishBasic).RawBytes, <-receiverBuf)
|
||||
}
|
||||
|
||||
func TestServerSendLWTRetain(t *testing.T) {
|
||||
s := newServer()
|
||||
s.Serve()
|
||||
defer s.Close()
|
||||
|
||||
sender, _, w1 := newTestClient()
|
||||
sender.ID = "sender"
|
||||
sender.Properties.Will = Will{
|
||||
Flag: 1,
|
||||
TopicName: "a/b/c",
|
||||
Payload: []byte("hello mochi"),
|
||||
Retain: true,
|
||||
}
|
||||
s.Clients.Add(sender)
|
||||
|
||||
receiver, r2, w2 := newTestClient()
|
||||
receiver.ID = "receiver"
|
||||
s.Clients.Add(receiver)
|
||||
s.Topics.Subscribe(receiver.ID, packets.Subscription{Filter: "a/b/c", Qos: 0})
|
||||
|
||||
require.Equal(t, int64(0), atomic.LoadInt64(&s.Info.PacketsReceived))
|
||||
require.Equal(t, 0, len(s.Topics.Messages("a/b/c")))
|
||||
|
||||
receiverBuf := make(chan []byte)
|
||||
go func() {
|
||||
buf, err := io.ReadAll(r2)
|
||||
require.NoError(t, err)
|
||||
receiverBuf <- buf
|
||||
}()
|
||||
|
||||
go func() {
|
||||
s.sendLWT(sender)
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
w1.Close()
|
||||
w2.Close()
|
||||
}()
|
||||
|
||||
require.Equal(t, packets.TPacketData[packets.Publish].Get(packets.TPublishRetain).RawBytes, <-receiverBuf)
|
||||
}
|
||||
|
||||
func TestServerSendLWTDelayed(t *testing.T) {
|
||||
s := newServer()
|
||||
cl1, _, _ := newTestClient()
|
||||
|
Reference in New Issue
Block a user