mirror of
https://github.com/mochi-mqtt/server.git
synced 2025-10-06 00:27:01 +08:00
Rename Quota methods for clarity (#159)
This commit is contained in:
@@ -272,7 +272,7 @@ func (cl *Client) ResendInflightMessages(force bool) error {
|
|||||||
tk.FixedHeader.Dup = true // [MQTT-3.3.1-1] [MQTT-3.3.1-3]
|
tk.FixedHeader.Dup = true // [MQTT-3.3.1-1] [MQTT-3.3.1-3]
|
||||||
}
|
}
|
||||||
|
|
||||||
// cl.ops.hooks.OnQosPublish(cl, tk.Packet, nt, tk.Resends)
|
cl.ops.hooks.OnQosPublish(cl, tk, tk.Created, 0)
|
||||||
err := cl.WritePacket(tk)
|
err := cl.WritePacket(tk)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@@ -36,7 +36,7 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
err = server.AddHook(new(debug.Hook), &debug.Options{
|
err = server.AddHook(new(debug.Hook), &debug.Options{
|
||||||
ShowPacketData: true,
|
// ShowPacketData: true,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
|
12
inflight.go
12
inflight.go
@@ -104,14 +104,14 @@ func (i *Inflight) Delete(id uint16) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TakeRecieveQuota reduces the receive quota by 1.
|
// TakeRecieveQuota reduces the receive quota by 1.
|
||||||
func (i *Inflight) TakeReceiveQuota() {
|
func (i *Inflight) DecreaseReceiveQuota() {
|
||||||
if atomic.LoadInt32(&i.receiveQuota) > 0 {
|
if atomic.LoadInt32(&i.receiveQuota) > 0 {
|
||||||
atomic.AddInt32(&i.receiveQuota, -1)
|
atomic.AddInt32(&i.receiveQuota, -1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TakeRecieveQuota increases the receive quota by 1.
|
// TakeRecieveQuota increases the receive quota by 1.
|
||||||
func (i *Inflight) ReturnReceiveQuota() {
|
func (i *Inflight) IncreaseReceiveQuota() {
|
||||||
if atomic.LoadInt32(&i.receiveQuota) < atomic.LoadInt32(&i.maximumReceiveQuota) {
|
if atomic.LoadInt32(&i.receiveQuota) < atomic.LoadInt32(&i.maximumReceiveQuota) {
|
||||||
atomic.AddInt32(&i.receiveQuota, 1)
|
atomic.AddInt32(&i.receiveQuota, 1)
|
||||||
}
|
}
|
||||||
@@ -123,15 +123,15 @@ func (i *Inflight) ResetReceiveQuota(n int32) {
|
|||||||
atomic.StoreInt32(&i.maximumReceiveQuota, n)
|
atomic.StoreInt32(&i.maximumReceiveQuota, n)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TakeSendQuota reduces the send quota by 1.
|
// DecreaseSendQuota reduces the send quota by 1.
|
||||||
func (i *Inflight) TakeSendQuota() {
|
func (i *Inflight) DecreaseSendQuota() {
|
||||||
if atomic.LoadInt32(&i.sendQuota) > 0 {
|
if atomic.LoadInt32(&i.sendQuota) > 0 {
|
||||||
atomic.AddInt32(&i.sendQuota, -1)
|
atomic.AddInt32(&i.sendQuota, -1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReturnSendQuota increases the send quota by 1.
|
// IncreaseSendQuota increases the send quota by 1.
|
||||||
func (i *Inflight) ReturnSendQuota() {
|
func (i *Inflight) IncreaseSendQuota() {
|
||||||
if atomic.LoadInt32(&i.sendQuota) < atomic.LoadInt32(&i.maximumSendQuota) {
|
if atomic.LoadInt32(&i.sendQuota) < atomic.LoadInt32(&i.maximumSendQuota) {
|
||||||
atomic.AddInt32(&i.sendQuota, 1)
|
atomic.AddInt32(&i.sendQuota, 1)
|
||||||
}
|
}
|
||||||
|
@@ -95,12 +95,12 @@ func TestReceiveQuota(t *testing.T) {
|
|||||||
require.Equal(t, int32(4), atomic.LoadInt32(&i.receiveQuota))
|
require.Equal(t, int32(4), atomic.LoadInt32(&i.receiveQuota))
|
||||||
|
|
||||||
// Return 1
|
// Return 1
|
||||||
i.ReturnReceiveQuota()
|
i.IncreaseReceiveQuota()
|
||||||
require.Equal(t, int32(5), atomic.LoadInt32(&i.maximumReceiveQuota))
|
require.Equal(t, int32(5), atomic.LoadInt32(&i.maximumReceiveQuota))
|
||||||
require.Equal(t, int32(5), atomic.LoadInt32(&i.receiveQuota))
|
require.Equal(t, int32(5), atomic.LoadInt32(&i.receiveQuota))
|
||||||
|
|
||||||
// Try to go over max limit
|
// Try to go over max limit
|
||||||
i.ReturnReceiveQuota()
|
i.IncreaseReceiveQuota()
|
||||||
require.Equal(t, int32(5), atomic.LoadInt32(&i.maximumReceiveQuota))
|
require.Equal(t, int32(5), atomic.LoadInt32(&i.maximumReceiveQuota))
|
||||||
require.Equal(t, int32(5), atomic.LoadInt32(&i.receiveQuota))
|
require.Equal(t, int32(5), atomic.LoadInt32(&i.receiveQuota))
|
||||||
|
|
||||||
@@ -110,12 +110,12 @@ func TestReceiveQuota(t *testing.T) {
|
|||||||
require.Equal(t, int32(1), atomic.LoadInt32(&i.receiveQuota))
|
require.Equal(t, int32(1), atomic.LoadInt32(&i.receiveQuota))
|
||||||
|
|
||||||
// Take 1
|
// Take 1
|
||||||
i.TakeReceiveQuota()
|
i.DecreaseReceiveQuota()
|
||||||
require.Equal(t, int32(1), atomic.LoadInt32(&i.maximumReceiveQuota))
|
require.Equal(t, int32(1), atomic.LoadInt32(&i.maximumReceiveQuota))
|
||||||
require.Equal(t, int32(0), atomic.LoadInt32(&i.receiveQuota))
|
require.Equal(t, int32(0), atomic.LoadInt32(&i.receiveQuota))
|
||||||
|
|
||||||
// Try to go below zero
|
// Try to go below zero
|
||||||
i.TakeReceiveQuota()
|
i.DecreaseReceiveQuota()
|
||||||
require.Equal(t, int32(1), atomic.LoadInt32(&i.maximumReceiveQuota))
|
require.Equal(t, int32(1), atomic.LoadInt32(&i.maximumReceiveQuota))
|
||||||
require.Equal(t, int32(0), atomic.LoadInt32(&i.receiveQuota))
|
require.Equal(t, int32(0), atomic.LoadInt32(&i.receiveQuota))
|
||||||
}
|
}
|
||||||
@@ -137,12 +137,12 @@ func TestSendQuota(t *testing.T) {
|
|||||||
require.Equal(t, int32(4), atomic.LoadInt32(&i.sendQuota))
|
require.Equal(t, int32(4), atomic.LoadInt32(&i.sendQuota))
|
||||||
|
|
||||||
// Return 1
|
// Return 1
|
||||||
i.ReturnSendQuota()
|
i.IncreaseSendQuota()
|
||||||
require.Equal(t, int32(5), atomic.LoadInt32(&i.maximumSendQuota))
|
require.Equal(t, int32(5), atomic.LoadInt32(&i.maximumSendQuota))
|
||||||
require.Equal(t, int32(5), atomic.LoadInt32(&i.sendQuota))
|
require.Equal(t, int32(5), atomic.LoadInt32(&i.sendQuota))
|
||||||
|
|
||||||
// Try to go over max limit
|
// Try to go over max limit
|
||||||
i.ReturnSendQuota()
|
i.IncreaseSendQuota()
|
||||||
require.Equal(t, int32(5), atomic.LoadInt32(&i.maximumSendQuota))
|
require.Equal(t, int32(5), atomic.LoadInt32(&i.maximumSendQuota))
|
||||||
require.Equal(t, int32(5), atomic.LoadInt32(&i.sendQuota))
|
require.Equal(t, int32(5), atomic.LoadInt32(&i.sendQuota))
|
||||||
|
|
||||||
@@ -152,12 +152,12 @@ func TestSendQuota(t *testing.T) {
|
|||||||
require.Equal(t, int32(1), atomic.LoadInt32(&i.sendQuota))
|
require.Equal(t, int32(1), atomic.LoadInt32(&i.sendQuota))
|
||||||
|
|
||||||
// Take 1
|
// Take 1
|
||||||
i.TakeSendQuota()
|
i.DecreaseSendQuota()
|
||||||
require.Equal(t, int32(1), atomic.LoadInt32(&i.maximumSendQuota))
|
require.Equal(t, int32(1), atomic.LoadInt32(&i.maximumSendQuota))
|
||||||
require.Equal(t, int32(0), atomic.LoadInt32(&i.sendQuota))
|
require.Equal(t, int32(0), atomic.LoadInt32(&i.sendQuota))
|
||||||
|
|
||||||
// Try to go below zero
|
// Try to go below zero
|
||||||
i.TakeSendQuota()
|
i.DecreaseSendQuota()
|
||||||
require.Equal(t, int32(1), atomic.LoadInt32(&i.maximumSendQuota))
|
require.Equal(t, int32(1), atomic.LoadInt32(&i.maximumSendQuota))
|
||||||
require.Equal(t, int32(0), atomic.LoadInt32(&i.sendQuota))
|
require.Equal(t, int32(0), atomic.LoadInt32(&i.sendQuota))
|
||||||
}
|
}
|
||||||
|
21
server.go
21
server.go
@@ -607,7 +607,7 @@ func (s *Server) processPacket(cl *Client, pk packets.Packet) error {
|
|||||||
if ok := cl.State.Inflight.Delete(next.PacketID); ok {
|
if ok := cl.State.Inflight.Delete(next.PacketID); ok {
|
||||||
atomic.AddInt64(&s.Info.Inflight, -1)
|
atomic.AddInt64(&s.Info.Inflight, -1)
|
||||||
}
|
}
|
||||||
cl.State.Inflight.TakeSendQuota()
|
cl.State.Inflight.DecreaseSendQuota()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -722,7 +722,7 @@ func (s *Server) processPublish(cl *Client, pk packets.Packet) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
cl.State.Inflight.TakeReceiveQuota()
|
cl.State.Inflight.DecreaseReceiveQuota()
|
||||||
ack := s.buildAck(pk.PacketID, packets.Puback, 0, pk.Properties, packets.QosCodes[pk.FixedHeader.Qos]) // [MQTT-4.3.2-4]
|
ack := s.buildAck(pk.PacketID, packets.Puback, 0, pk.Properties, packets.QosCodes[pk.FixedHeader.Qos]) // [MQTT-4.3.2-4]
|
||||||
if pk.FixedHeader.Qos == 2 {
|
if pk.FixedHeader.Qos == 2 {
|
||||||
ack = s.buildAck(pk.PacketID, packets.Pubrec, 0, pk.Properties, packets.CodeSuccess) // [MQTT-3.3.4-1] [MQTT-4.3.3-8]
|
ack = s.buildAck(pk.PacketID, packets.Pubrec, 0, pk.Properties, packets.CodeSuccess) // [MQTT-3.3.4-1] [MQTT-4.3.3-8]
|
||||||
@@ -741,7 +741,7 @@ func (s *Server) processPublish(cl *Client, pk packets.Packet) error {
|
|||||||
if ok := cl.State.Inflight.Delete(ack.PacketID); ok {
|
if ok := cl.State.Inflight.Delete(ack.PacketID); ok {
|
||||||
atomic.AddInt64(&s.Info.Inflight, -1)
|
atomic.AddInt64(&s.Info.Inflight, -1)
|
||||||
}
|
}
|
||||||
cl.State.Inflight.ReturnReceiveQuota()
|
cl.State.Inflight.IncreaseReceiveQuota()
|
||||||
s.hooks.OnQosComplete(cl, pk)
|
s.hooks.OnQosComplete(cl, pk)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -838,6 +838,7 @@ func (s *Server) publishToClient(cl *Client, sub packets.Subscription, pk packet
|
|||||||
if ok := cl.State.Inflight.Set(out); ok { // [MQTT-4.3.2-3] [MQTT-4.3.3-3]
|
if ok := cl.State.Inflight.Set(out); ok { // [MQTT-4.3.2-3] [MQTT-4.3.3-3]
|
||||||
atomic.AddInt64(&s.Info.Inflight, 1)
|
atomic.AddInt64(&s.Info.Inflight, 1)
|
||||||
s.hooks.OnQosPublish(cl, out, out.Created, 0)
|
s.hooks.OnQosPublish(cl, out, out.Created, 0)
|
||||||
|
cl.State.Inflight.DecreaseSendQuota()
|
||||||
}
|
}
|
||||||
|
|
||||||
if sentQuota == 0 && atomic.LoadInt32(&cl.State.Inflight.maximumSendQuota) > 0 {
|
if sentQuota == 0 && atomic.LoadInt32(&cl.State.Inflight.maximumSendQuota) > 0 {
|
||||||
@@ -851,8 +852,6 @@ func (s *Server) publishToClient(cl *Client, sub packets.Subscription, pk packet
|
|||||||
return pk, packets.CodeDisconnect
|
return pk, packets.CodeDisconnect
|
||||||
}
|
}
|
||||||
|
|
||||||
cl.State.Inflight.TakeSendQuota()
|
|
||||||
|
|
||||||
return out, cl.WritePacket(out)
|
return out, cl.WritePacket(out)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -902,7 +901,7 @@ func (s *Server) processPuback(cl *Client, pk packets.Packet) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if ok := cl.State.Inflight.Delete(pk.PacketID); ok { // [MQTT-4.3.2-5]
|
if ok := cl.State.Inflight.Delete(pk.PacketID); ok { // [MQTT-4.3.2-5]
|
||||||
cl.State.Inflight.ReturnSendQuota()
|
cl.State.Inflight.IncreaseSendQuota()
|
||||||
atomic.AddInt64(&s.Info.Inflight, -1)
|
atomic.AddInt64(&s.Info.Inflight, -1)
|
||||||
s.hooks.OnQosComplete(cl, pk)
|
s.hooks.OnQosComplete(cl, pk)
|
||||||
}
|
}
|
||||||
@@ -925,7 +924,7 @@ func (s *Server) processPubrec(cl *Client, pk packets.Packet) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ack := s.buildAck(pk.PacketID, packets.Pubrel, 1, pk.Properties, packets.CodeSuccess) // [MQTT-4.3.3-4] ![MQTT-4.3.3-6]
|
ack := s.buildAck(pk.PacketID, packets.Pubrel, 1, pk.Properties, packets.CodeSuccess) // [MQTT-4.3.3-4] ![MQTT-4.3.3-6]
|
||||||
cl.State.Inflight.TakeReceiveQuota() // -1 RECV QUOTA
|
cl.State.Inflight.DecreaseReceiveQuota() // -1 RECV QUOTA
|
||||||
cl.State.Inflight.Set(ack) // [MQTT-4.3.3-5]
|
cl.State.Inflight.Set(ack) // [MQTT-4.3.3-5]
|
||||||
return cl.WritePacket(ack)
|
return cl.WritePacket(ack)
|
||||||
}
|
}
|
||||||
@@ -952,8 +951,8 @@ func (s *Server) processPubrel(cl *Client, pk packets.Packet) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
cl.State.Inflight.ReturnReceiveQuota() // +1 RECV QUOTA
|
cl.State.Inflight.IncreaseReceiveQuota() // +1 RECV QUOTA
|
||||||
cl.State.Inflight.ReturnSendQuota() // +1 SENT QUOTA
|
cl.State.Inflight.IncreaseSendQuota() // +1 SENT QUOTA
|
||||||
if ok := cl.State.Inflight.Delete(pk.PacketID); ok { // [MQTT-4.3.3-12]
|
if ok := cl.State.Inflight.Delete(pk.PacketID); ok { // [MQTT-4.3.3-12]
|
||||||
atomic.AddInt64(&s.Info.Inflight, -1)
|
atomic.AddInt64(&s.Info.Inflight, -1)
|
||||||
s.hooks.OnQosComplete(cl, pk)
|
s.hooks.OnQosComplete(cl, pk)
|
||||||
@@ -965,8 +964,8 @@ func (s *Server) processPubrel(cl *Client, pk packets.Packet) error {
|
|||||||
// processPubcomp processes a Pubcomp packet, denoting completion of a QOS 2 packet sent from the server.
|
// processPubcomp processes a Pubcomp packet, denoting completion of a QOS 2 packet sent from the server.
|
||||||
func (s *Server) processPubcomp(cl *Client, pk packets.Packet) error {
|
func (s *Server) processPubcomp(cl *Client, pk packets.Packet) error {
|
||||||
// regardless of whether the pubcomp is a success or failure, we end the qos flow, delete inflight, and restore the quotas.
|
// regardless of whether the pubcomp is a success or failure, we end the qos flow, delete inflight, and restore the quotas.
|
||||||
cl.State.Inflight.ReturnReceiveQuota() // +1 RECV QUOTA
|
cl.State.Inflight.IncreaseReceiveQuota() // +1 RECV QUOTA
|
||||||
cl.State.Inflight.ReturnSendQuota() // +1 SENT QUOTA
|
cl.State.Inflight.IncreaseSendQuota() // +1 SENT QUOTA
|
||||||
if ok := cl.State.Inflight.Delete(pk.PacketID); ok {
|
if ok := cl.State.Inflight.Delete(pk.PacketID); ok {
|
||||||
atomic.AddInt64(&s.Info.Inflight, -1)
|
atomic.AddInt64(&s.Info.Inflight, -1)
|
||||||
s.hooks.OnQosComplete(cl, pk)
|
s.hooks.OnQosComplete(cl, pk)
|
||||||
|
Reference in New Issue
Block a user