Combines two fixes

This commit is contained in:
Joshua MacDonald
2022-03-17 13:40:46 -07:00
parent 5de12d0460
commit c6643592f6
4 changed files with 68 additions and 33 deletions

View File

@@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"errors" "errors"
"fmt" "fmt"
"strconv"
) )
// All of the valid packet types and their packet identifier. // All of the valid packet types and their packet identifier.
@@ -662,3 +663,8 @@ func (pk *Packet) UnsubscribeValidate() (byte, error) {
return Accepted, nil return Accepted, nil
} }
// FormatID returns the PacketID field as a decimal integer.
func (pk *Packet) FormatID() string {
return strconv.FormatUint(uint64(pk.PacketID), 10)
}

View File

@@ -3,6 +3,7 @@ package packets
import ( import (
"bytes" "bytes"
"errors" "errors"
"fmt"
"testing" "testing"
"github.com/jinzhu/copier" "github.com/jinzhu/copier"
@@ -1081,3 +1082,10 @@ func BenchmarkUnsubscribeValidate(b *testing.B) {
pk.UnsubscribeValidate() pk.UnsubscribeValidate()
} }
} }
func TestFormatPacketID(t *testing.T) {
for _, id := range []uint16{0, 7, 0x100, 0xffff} {
packet := &Packet{PacketID: id}
require.Equal(t, fmt.Sprint(id), packet.FormatID())
}
}

View File

@@ -564,7 +564,7 @@ func (s *Server) publishToSubscribers(pk packets.Packet) {
if s.Store != nil { if s.Store != nil {
s.onStorage(client, s.Store.WriteInflight(persistence.Message{ s.onStorage(client, s.Store.WriteInflight(persistence.Message{
ID: "if_" + client.ID + "_" + strconv.Itoa(int(out.PacketID)), ID: persistentID(client, out),
T: persistence.KRetained, T: persistence.KRetained,
FixedHeader: persistence.FixedHeader(out.FixedHeader), FixedHeader: persistence.FixedHeader(out.FixedHeader),
TopicName: out.TopicName, TopicName: out.TopicName,
@@ -586,7 +586,7 @@ func (s *Server) processPuback(cl *clients.Client, pk packets.Packet) error {
atomic.AddInt64(&s.System.Inflight, -1) atomic.AddInt64(&s.System.Inflight, -1)
} }
if s.Store != nil { if s.Store != nil {
s.onStorage(cl, s.Store.DeleteInflight("if_"+cl.ID+"_"+strconv.Itoa(int(pk.PacketID)))) s.onStorage(cl, s.Store.DeleteInflight(persistentID(cl, pk)))
} }
return nil return nil
} }
@@ -628,7 +628,7 @@ func (s *Server) processPubrel(cl *clients.Client, pk packets.Packet) error {
} }
if s.Store != nil { if s.Store != nil {
s.onStorage(cl, s.Store.DeleteInflight("if_"+cl.ID+"_"+strconv.Itoa(int(pk.PacketID)))) s.onStorage(cl, s.Store.DeleteInflight(persistentID(cl, pk)))
} }
return nil return nil
@@ -641,7 +641,7 @@ func (s *Server) processPubcomp(cl *clients.Client, pk packets.Packet) error {
atomic.AddInt64(&s.System.Inflight, -1) atomic.AddInt64(&s.System.Inflight, -1)
} }
if s.Store != nil { if s.Store != nil {
s.onStorage(cl, s.Store.DeleteInflight("if_"+cl.ID+"_"+strconv.Itoa(int(pk.PacketID)))) s.onStorage(cl, s.Store.DeleteInflight(persistentID(cl, pk)))
} }
return nil return nil
} }
@@ -721,6 +721,17 @@ func (s *Server) processUnsubscribe(cl *clients.Client, pk packets.Packet) error
return nil return nil
} }
// atomicItoa reads an *int64 and formats a decimal string.
func atomicItoa(ptr *int64) string {
return strconv.FormatInt(atomic.LoadInt64(ptr), 10)
}
// persistentID return a string combining the client and packet
// identifiers for use with the persistence layer.
func persistentID(client *clients.Client, pk packets.Packet) string {
return "if_" + client.ID + "_" + pk.FormatID()
}
// publishSysTopics publishes the current values to the server $SYS topics. // publishSysTopics publishes the current values to the server $SYS topics.
// Due to the int to string conversions this method is not as cheap as // Due to the int to string conversions this method is not as cheap as
// some of the others so the publishing interval should be set appropriately. // some of the others so the publishing interval should be set appropriately.
@@ -732,26 +743,27 @@ func (s *Server) publishSysTopics() {
}, },
} }
s.System.Uptime = time.Now().Unix() - s.System.Started uptime := time.Now().Unix() - atomic.LoadInt64(&s.System.Started)
atomic.StoreInt64(&s.System.Uptime, uptime)
topics := map[string]string{ topics := map[string]string{
"$SYS/broker/version": s.System.Version, "$SYS/broker/version": s.System.Version,
"$SYS/broker/uptime": strconv.Itoa(int(s.System.Uptime)), "$SYS/broker/uptime": atomicItoa(&s.System.Uptime),
"$SYS/broker/timestamp": strconv.Itoa(int(s.System.Started)), "$SYS/broker/timestamp": atomicItoa(&s.System.Started),
"$SYS/broker/load/bytes/received": strconv.Itoa(int(s.System.BytesRecv)), "$SYS/broker/load/bytes/received": atomicItoa(&s.System.BytesRecv),
"$SYS/broker/load/bytes/sent": strconv.Itoa(int(s.System.BytesSent)), "$SYS/broker/load/bytes/sent": atomicItoa(&s.System.BytesSent),
"$SYS/broker/clients/connected": strconv.Itoa(int(s.System.ClientsConnected)), "$SYS/broker/clients/connected": atomicItoa(&s.System.ClientsConnected),
"$SYS/broker/clients/disconnected": strconv.Itoa(int(s.System.ClientsDisconnected)), "$SYS/broker/clients/disconnected": atomicItoa(&s.System.ClientsDisconnected),
"$SYS/broker/clients/maximum": strconv.Itoa(int(s.System.ClientsMax)), "$SYS/broker/clients/maximum": atomicItoa(&s.System.ClientsMax),
"$SYS/broker/clients/total": strconv.Itoa(int(s.System.ClientsTotal)), "$SYS/broker/clients/total": atomicItoa(&s.System.ClientsTotal),
"$SYS/broker/connections/total": strconv.Itoa(int(s.System.ConnectionsTotal)), "$SYS/broker/connections/total": atomicItoa(&s.System.ConnectionsTotal),
"$SYS/broker/messages/received": strconv.Itoa(int(s.System.MessagesRecv)), "$SYS/broker/messages/received": atomicItoa(&s.System.MessagesRecv),
"$SYS/broker/messages/sent": strconv.Itoa(int(s.System.MessagesSent)), "$SYS/broker/messages/sent": atomicItoa(&s.System.MessagesSent),
"$SYS/broker/messages/publish/dropped": strconv.Itoa(int(s.System.PublishDropped)), "$SYS/broker/messages/publish/dropped": atomicItoa(&s.System.PublishDropped),
"$SYS/broker/messages/publish/received": strconv.Itoa(int(s.System.PublishRecv)), "$SYS/broker/messages/publish/received": atomicItoa(&s.System.PublishRecv),
"$SYS/broker/messages/publish/sent": strconv.Itoa(int(s.System.PublishSent)), "$SYS/broker/messages/publish/sent": atomicItoa(&s.System.PublishSent),
"$SYS/broker/messages/retained/count": strconv.Itoa(int(s.System.Retained)), "$SYS/broker/messages/retained/count": atomicItoa(&s.System.Retained),
"$SYS/broker/messages/inflight": strconv.Itoa(int(s.System.Inflight)), "$SYS/broker/messages/inflight": atomicItoa(&s.System.Inflight),
"$SYS/broker/subscriptions/count": strconv.Itoa(int(s.System.Subscriptions)), "$SYS/broker/subscriptions/count": atomicItoa(&s.System.Subscriptions),
} }
for topic, payload := range topics { for topic, payload := range topics {
@@ -786,7 +798,7 @@ func (s *Server) ResendClientInflight(cl *clients.Client, force bool) error {
} }
if s.Store != nil { if s.Store != nil {
s.onStorage(cl, s.Store.DeleteInflight("if_"+cl.ID+"_"+strconv.Itoa(int(tk.Packet.PacketID)))) s.onStorage(cl, s.Store.DeleteInflight(persistentID(cl, tk.Packet)))
} }
continue continue
@@ -811,7 +823,7 @@ func (s *Server) ResendClientInflight(cl *clients.Client, force bool) error {
if s.Store != nil { if s.Store != nil {
s.onStorage(cl, s.Store.WriteInflight(persistence.Message{ s.onStorage(cl, s.Store.WriteInflight(persistence.Message{
ID: "if_" + cl.ID + "_" + strconv.Itoa(int(tk.Packet.PacketID)), ID: persistentID(cl, tk.Packet),
T: persistence.KRetained, T: persistence.KRetained,
FixedHeader: persistence.FixedHeader(tk.Packet.FixedHeader), FixedHeader: persistence.FixedHeader(tk.Packet.FixedHeader),
TopicName: tk.Packet.TopicName, TopicName: tk.Packet.TopicName,

View File

@@ -127,6 +127,16 @@ func BenchmarkServerAddStore(b *testing.B) {
} }
} }
func TestPersistentID(t *testing.T) {
s := New()
pk := packets.Packet{
PacketID: 1234,
}
cl := clients.NewClientStub(s.System)
cl.ID = "test"
require.Equal(t, "if_test_1234", persistentID(cl, pk))
}
func TestServerAddListener(t *testing.T) { func TestServerAddListener(t *testing.T) {
s := New() s := New()
require.NotNil(t, s) require.NotNil(t, s)
@@ -1055,7 +1065,7 @@ func TestServerProcessPublishSystemPrefix(t *testing.T) {
}) })
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, int64(0), s.System.BytesSent) require.Equal(t, int64(0), atomic.LoadInt64(&s.System.BytesSent))
} }
func TestServerProcessPublishBadACL(t *testing.T) { func TestServerProcessPublishBadACL(t *testing.T) {
@@ -1120,8 +1130,7 @@ func TestServerPublishInline(t *testing.T) {
'h', 'e', 'l', 'l', 'o', 'h', 'e', 'l', 'l', 'o',
}, <-ack1) }, <-ack1)
bsent := atomic.LoadInt64(&s.System.BytesSent) require.Equal(t, int64(14), atomic.LoadInt64(&s.System.BytesSent))
require.Equal(t, int64(14), bsent)
close(s.inline.done) close(s.inline.done)
} }
@@ -1159,7 +1168,7 @@ func TestServerPublishInlineRetain(t *testing.T) {
'h', 'e', 'l', 'l', 'o', 'h', 'e', 'l', 'l', 'o',
}, <-ack1) }, <-ack1)
require.Equal(t, int64(14), s.System.BytesSent) require.Equal(t, int64(14), atomic.LoadInt64(&s.System.BytesSent))
close(s.inline.done) close(s.inline.done)
} }
@@ -1169,7 +1178,7 @@ func TestServerPublishInlineSysTopicError(t *testing.T) {
err := s.Publish("$SYS/stuff", []byte("hello"), false) err := s.Publish("$SYS/stuff", []byte("hello"), false)
require.Error(t, err) require.Error(t, err)
require.Equal(t, int64(0), s.System.BytesSent) require.Equal(t, int64(0), atomic.LoadInt64(&s.System.BytesSent))
} }
func TestServerEventOnMessage(t *testing.T) { func TestServerEventOnMessage(t *testing.T) {
@@ -1218,7 +1227,7 @@ func TestServerEventOnMessage(t *testing.T) {
'h', 'e', 'l', 'l', 'o', 'h', 'e', 'l', 'l', 'o',
}, <-ack1) }, <-ack1)
require.Equal(t, int64(14), s.System.BytesSent) require.Equal(t, int64(14), atomic.LoadInt64(&s.System.BytesSent))
} }
func TestServerProcessPublishHookOnMessageModify(t *testing.T) { func TestServerProcessPublishHookOnMessageModify(t *testing.T) {
@@ -1270,7 +1279,7 @@ func TestServerProcessPublishHookOnMessageModify(t *testing.T) {
'w', 'o', 'r', 'l', 'd', 'w', 'o', 'r', 'l', 'd',
}, <-ack1) }, <-ack1)
require.Equal(t, int64(14), s.System.BytesSent) require.Equal(t, int64(14), atomic.LoadInt64(&s.System.BytesSent))
} }
func TestServerProcessPublishHookOnMessageModifyError(t *testing.T) { func TestServerProcessPublishHookOnMessageModifyError(t *testing.T) {
@@ -1314,7 +1323,7 @@ func TestServerProcessPublishHookOnMessageModifyError(t *testing.T) {
'h', 'e', 'l', 'l', 'o', 'h', 'e', 'l', 'l', 'o',
}, <-ack1) }, <-ack1)
require.Equal(t, int64(14), s.System.BytesSent) require.Equal(t, int64(14), atomic.LoadInt64(&s.System.BytesSent))
} }
func TestServerProcessPublishHookOnMessageAllowClients(t *testing.T) { func TestServerProcessPublishHookOnMessageAllowClients(t *testing.T) {
@@ -1393,7 +1402,7 @@ func TestServerProcessPublishHookOnMessageAllowClients(t *testing.T) {
'a', 'a',
}, <-ack2) }, <-ack2)
require.Equal(t, int64(24), s.System.BytesSent) require.Equal(t, int64(24), atomic.LoadInt64(&s.System.BytesSent))
} }
func TestServerProcessPuback(t *testing.T) { func TestServerProcessPuback(t *testing.T) {