Add OnRetainPublished hook (#237)

* Add OnRetainPublished hook

* Skip OnRetainPublished if publish error
This commit is contained in:
thedevop
2023-06-13 11:24:04 -07:00
committed by GitHub
parent af79b55b9f
commit 1ee2158711
3 changed files with 17 additions and 0 deletions

View File

@@ -41,6 +41,7 @@ const (
OnPublished
OnPublishDropped
OnRetainMessage
OnRetainPublished
OnQosPublish
OnQosComplete
OnQosDropped
@@ -91,6 +92,7 @@ type Hook interface {
OnPublished(cl *Client, pk packets.Packet)
OnPublishDropped(cl *Client, pk packets.Packet)
OnRetainMessage(cl *Client, pk packets.Packet, r int64)
OnRetainPublished(cl *Client, pk packets.Packet)
OnQosPublish(cl *Client, pk packets.Packet, sent int64, resends int)
OnQosComplete(cl *Client, pk packets.Packet)
OnQosDropped(cl *Client, pk packets.Packet)
@@ -417,6 +419,15 @@ func (h *Hooks) OnRetainMessage(cl *Client, pk packets.Packet, r int64) {
}
}
// OnRetainPublished is called when a retained message is published.
func (h *Hooks) OnRetainPublished(cl *Client, pk packets.Packet) {
for _, hook := range h.GetAll() {
if hook.Provides(OnRetainPublished) {
hook.OnRetainPublished(cl, pk)
}
}
}
// OnQosPublish is called when a publish packet with Qos >= 1 is issued to a subscriber.
// In other words, this method is called when a new inflight message is created or resent.
// It is typically used to store a new inflight message.
@@ -758,6 +769,9 @@ func (h *HookBase) OnPublishDropped(cl *Client, pk packets.Packet) {}
// OnRetainMessage is called then a published message is retained.
func (h *HookBase) OnRetainMessage(cl *Client, pk packets.Packet, r int64) {}
// OnRetainPublished is called when a retained message is published.
func (h *HookBase) OnRetainPublished(cl *Client, pk packets.Packet) {}
// OnQosPublish is called when a publish packet with Qos > 1 is issued to a subscriber.
func (h *HookBase) OnQosPublish(cl *Client, pk packets.Packet, sent int64, resends int) {}

View File

@@ -238,6 +238,7 @@ func TestHooksNonReturns(t *testing.T) {
h.OnPublished(cl, packets.Packet{})
h.OnPublishDropped(cl, packets.Packet{})
h.OnRetainMessage(cl, packets.Packet{}, 0)
h.OnRetainPublished(cl, packets.Packet{})
h.OnQosPublish(cl, packets.Packet{}, time.Now().Unix(), 0)
h.OnQosComplete(cl, packets.Packet{})
h.OnQosDropped(cl, packets.Packet{})

View File

@@ -880,7 +880,9 @@ func (s *Server) publishRetainedToClient(cl *Client, sub packets.Subscription, e
_, err := s.publishToClient(cl, sub, pkv)
if err != nil {
s.Log.Debug().Err(err).Str("client", cl.ID).Str("listener", cl.Net.Listener).Interface("packet", pkv).Msg("failed to publish retained message")
continue
}
s.hooks.OnRetainPublished(cl, pkv)
}
}