diff --git a/clients.go b/clients.go index 03260b2..5cec6bc 100644 --- a/clients.go +++ b/clients.go @@ -272,7 +272,7 @@ func (cl *Client) ResendInflightMessages(force bool) error { tk.FixedHeader.Dup = true // [MQTT-3.3.1-1] [MQTT-3.3.1-3] } - // cl.ops.hooks.OnQosPublish(cl, tk.Packet, nt, tk.Resends) + cl.ops.hooks.OnQosPublish(cl, tk, tk.Created, 0) err := cl.WritePacket(tk) if err != nil { return err diff --git a/examples/debug/main.go b/examples/debug/main.go index a2bc1d9..d0dd5b5 100644 --- a/examples/debug/main.go +++ b/examples/debug/main.go @@ -36,7 +36,7 @@ func main() { } err = server.AddHook(new(debug.Hook), &debug.Options{ - ShowPacketData: true, + // ShowPacketData: true, }) if err != nil { log.Fatal(err) diff --git a/inflight.go b/inflight.go index b3ae26b..e0ae49e 100644 --- a/inflight.go +++ b/inflight.go @@ -104,14 +104,14 @@ func (i *Inflight) Delete(id uint16) bool { } // TakeRecieveQuota reduces the receive quota by 1. -func (i *Inflight) TakeReceiveQuota() { +func (i *Inflight) DecreaseReceiveQuota() { if atomic.LoadInt32(&i.receiveQuota) > 0 { atomic.AddInt32(&i.receiveQuota, -1) } } // TakeRecieveQuota increases the receive quota by 1. -func (i *Inflight) ReturnReceiveQuota() { +func (i *Inflight) IncreaseReceiveQuota() { if atomic.LoadInt32(&i.receiveQuota) < atomic.LoadInt32(&i.maximumReceiveQuota) { atomic.AddInt32(&i.receiveQuota, 1) } @@ -123,15 +123,15 @@ func (i *Inflight) ResetReceiveQuota(n int32) { atomic.StoreInt32(&i.maximumReceiveQuota, n) } -// TakeSendQuota reduces the send quota by 1. -func (i *Inflight) TakeSendQuota() { +// DecreaseSendQuota reduces the send quota by 1. +func (i *Inflight) DecreaseSendQuota() { if atomic.LoadInt32(&i.sendQuota) > 0 { atomic.AddInt32(&i.sendQuota, -1) } } -// ReturnSendQuota increases the send quota by 1. -func (i *Inflight) ReturnSendQuota() { +// IncreaseSendQuota increases the send quota by 1. +func (i *Inflight) IncreaseSendQuota() { if atomic.LoadInt32(&i.sendQuota) < atomic.LoadInt32(&i.maximumSendQuota) { atomic.AddInt32(&i.sendQuota, 1) } diff --git a/inflight_test.go b/inflight_test.go index 1028796..77d51f5 100644 --- a/inflight_test.go +++ b/inflight_test.go @@ -95,12 +95,12 @@ func TestReceiveQuota(t *testing.T) { require.Equal(t, int32(4), atomic.LoadInt32(&i.receiveQuota)) // Return 1 - i.ReturnReceiveQuota() + i.IncreaseReceiveQuota() require.Equal(t, int32(5), atomic.LoadInt32(&i.maximumReceiveQuota)) require.Equal(t, int32(5), atomic.LoadInt32(&i.receiveQuota)) // Try to go over max limit - i.ReturnReceiveQuota() + i.IncreaseReceiveQuota() require.Equal(t, int32(5), atomic.LoadInt32(&i.maximumReceiveQuota)) require.Equal(t, int32(5), atomic.LoadInt32(&i.receiveQuota)) @@ -110,12 +110,12 @@ func TestReceiveQuota(t *testing.T) { require.Equal(t, int32(1), atomic.LoadInt32(&i.receiveQuota)) // Take 1 - i.TakeReceiveQuota() + i.DecreaseReceiveQuota() require.Equal(t, int32(1), atomic.LoadInt32(&i.maximumReceiveQuota)) require.Equal(t, int32(0), atomic.LoadInt32(&i.receiveQuota)) // Try to go below zero - i.TakeReceiveQuota() + i.DecreaseReceiveQuota() require.Equal(t, int32(1), atomic.LoadInt32(&i.maximumReceiveQuota)) require.Equal(t, int32(0), atomic.LoadInt32(&i.receiveQuota)) } @@ -137,12 +137,12 @@ func TestSendQuota(t *testing.T) { require.Equal(t, int32(4), atomic.LoadInt32(&i.sendQuota)) // Return 1 - i.ReturnSendQuota() + i.IncreaseSendQuota() require.Equal(t, int32(5), atomic.LoadInt32(&i.maximumSendQuota)) require.Equal(t, int32(5), atomic.LoadInt32(&i.sendQuota)) // Try to go over max limit - i.ReturnSendQuota() + i.IncreaseSendQuota() require.Equal(t, int32(5), atomic.LoadInt32(&i.maximumSendQuota)) require.Equal(t, int32(5), atomic.LoadInt32(&i.sendQuota)) @@ -152,12 +152,12 @@ func TestSendQuota(t *testing.T) { require.Equal(t, int32(1), atomic.LoadInt32(&i.sendQuota)) // Take 1 - i.TakeSendQuota() + i.DecreaseSendQuota() require.Equal(t, int32(1), atomic.LoadInt32(&i.maximumSendQuota)) require.Equal(t, int32(0), atomic.LoadInt32(&i.sendQuota)) // Try to go below zero - i.TakeSendQuota() + i.DecreaseSendQuota() require.Equal(t, int32(1), atomic.LoadInt32(&i.maximumSendQuota)) require.Equal(t, int32(0), atomic.LoadInt32(&i.sendQuota)) } diff --git a/server.go b/server.go index 6b0cfe4..ae7aaca 100644 --- a/server.go +++ b/server.go @@ -607,7 +607,7 @@ func (s *Server) processPacket(cl *Client, pk packets.Packet) error { if ok := cl.State.Inflight.Delete(next.PacketID); ok { atomic.AddInt64(&s.Info.Inflight, -1) } - cl.State.Inflight.TakeSendQuota() + cl.State.Inflight.DecreaseSendQuota() } } @@ -722,7 +722,7 @@ func (s *Server) processPublish(cl *Client, pk packets.Packet) error { return nil } - cl.State.Inflight.TakeReceiveQuota() + cl.State.Inflight.DecreaseReceiveQuota() ack := s.buildAck(pk.PacketID, packets.Puback, 0, pk.Properties, packets.QosCodes[pk.FixedHeader.Qos]) // [MQTT-4.3.2-4] if pk.FixedHeader.Qos == 2 { ack = s.buildAck(pk.PacketID, packets.Pubrec, 0, pk.Properties, packets.CodeSuccess) // [MQTT-3.3.4-1] [MQTT-4.3.3-8] @@ -741,7 +741,7 @@ func (s *Server) processPublish(cl *Client, pk packets.Packet) error { if ok := cl.State.Inflight.Delete(ack.PacketID); ok { atomic.AddInt64(&s.Info.Inflight, -1) } - cl.State.Inflight.ReturnReceiveQuota() + cl.State.Inflight.IncreaseReceiveQuota() s.hooks.OnQosComplete(cl, pk) } @@ -838,6 +838,7 @@ func (s *Server) publishToClient(cl *Client, sub packets.Subscription, pk packet if ok := cl.State.Inflight.Set(out); ok { // [MQTT-4.3.2-3] [MQTT-4.3.3-3] atomic.AddInt64(&s.Info.Inflight, 1) s.hooks.OnQosPublish(cl, out, out.Created, 0) + cl.State.Inflight.DecreaseSendQuota() } if sentQuota == 0 && atomic.LoadInt32(&cl.State.Inflight.maximumSendQuota) > 0 { @@ -851,8 +852,6 @@ func (s *Server) publishToClient(cl *Client, sub packets.Subscription, pk packet return pk, packets.CodeDisconnect } - cl.State.Inflight.TakeSendQuota() - return out, cl.WritePacket(out) } @@ -902,7 +901,7 @@ func (s *Server) processPuback(cl *Client, pk packets.Packet) error { } if ok := cl.State.Inflight.Delete(pk.PacketID); ok { // [MQTT-4.3.2-5] - cl.State.Inflight.ReturnSendQuota() + cl.State.Inflight.IncreaseSendQuota() atomic.AddInt64(&s.Info.Inflight, -1) s.hooks.OnQosComplete(cl, pk) } @@ -925,7 +924,7 @@ func (s *Server) processPubrec(cl *Client, pk packets.Packet) error { } ack := s.buildAck(pk.PacketID, packets.Pubrel, 1, pk.Properties, packets.CodeSuccess) // [MQTT-4.3.3-4] ![MQTT-4.3.3-6] - cl.State.Inflight.TakeReceiveQuota() // -1 RECV QUOTA + cl.State.Inflight.DecreaseReceiveQuota() // -1 RECV QUOTA cl.State.Inflight.Set(ack) // [MQTT-4.3.3-5] return cl.WritePacket(ack) } @@ -952,8 +951,8 @@ func (s *Server) processPubrel(cl *Client, pk packets.Packet) error { return err } - cl.State.Inflight.ReturnReceiveQuota() // +1 RECV QUOTA - cl.State.Inflight.ReturnSendQuota() // +1 SENT QUOTA + cl.State.Inflight.IncreaseReceiveQuota() // +1 RECV QUOTA + cl.State.Inflight.IncreaseSendQuota() // +1 SENT QUOTA if ok := cl.State.Inflight.Delete(pk.PacketID); ok { // [MQTT-4.3.3-12] atomic.AddInt64(&s.Info.Inflight, -1) s.hooks.OnQosComplete(cl, pk) @@ -965,8 +964,8 @@ func (s *Server) processPubrel(cl *Client, pk packets.Packet) error { // processPubcomp processes a Pubcomp packet, denoting completion of a QOS 2 packet sent from the server. func (s *Server) processPubcomp(cl *Client, pk packets.Packet) error { // regardless of whether the pubcomp is a success or failure, we end the qos flow, delete inflight, and restore the quotas. - cl.State.Inflight.ReturnReceiveQuota() // +1 RECV QUOTA - cl.State.Inflight.ReturnSendQuota() // +1 SENT QUOTA + cl.State.Inflight.IncreaseReceiveQuota() // +1 RECV QUOTA + cl.State.Inflight.IncreaseSendQuota() // +1 SENT QUOTA if ok := cl.State.Inflight.Delete(pk.PacketID); ok { atomic.AddInt64(&s.Info.Inflight, -1) s.hooks.OnQosComplete(cl, pk)