Move msgToPacket to storage.Message.ToPacket

This commit is contained in:
mochi-co
2023-04-21 21:49:49 +01:00
parent c73ace2ea0
commit 605bb93c75
3 changed files with 64 additions and 25 deletions

View File

@@ -117,6 +117,36 @@ func (d *Message) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, d)
}
// ToPacket converts a storage.Message to a standard packet.
func (d *Message) ToPacket() packets.Packet {
pk := packets.Packet{
FixedHeader: d.FixedHeader,
PacketID: d.PacketID,
TopicName: d.TopicName,
Payload: d.Payload,
Origin: d.Origin,
Created: d.Created,
Properties: packets.Properties{
PayloadFormat: d.Properties.PayloadFormat,
PayloadFormatFlag: d.Properties.PayloadFormatFlag,
MessageExpiryInterval: d.Properties.MessageExpiryInterval,
ContentType: d.Properties.ContentType,
ResponseTopic: d.Properties.ResponseTopic,
CorrelationData: d.Properties.CorrelationData,
SubscriptionIdentifier: d.Properties.SubscriptionIdentifier,
TopicAlias: d.Properties.TopicAlias,
User: d.Properties.User,
},
}
// Return a deep copy of the packet data otherwise the slices will
// continue pointing at the values from the storage packet.
pk = pk.Copy(true)
pk.FixedHeader.Dup = d.FixedHeader.Dup
return pk
}
// Subscription is a storable representation of an mqtt subscription.
type Subscription struct {
T string `json:"t"`

View File

@@ -194,3 +194,35 @@ func TestSysInfoUnmarshalBinaryEmpty(t *testing.T) {
require.NoError(t, err)
require.Equal(t, SystemInfo{}, d)
}
func TestMessageToPacket(t *testing.T) {
d := messageStruct
pk := d.ToPacket()
require.Equal(t, packets.Packet{
Payload: []byte("payload"),
FixedHeader: packets.FixedHeader{
Remaining: d.FixedHeader.Remaining,
Type: d.FixedHeader.Type,
Qos: d.FixedHeader.Qos,
Dup: d.FixedHeader.Dup,
Retain: d.FixedHeader.Retain,
},
Origin: d.Origin,
TopicName: d.TopicName,
Properties: packets.Properties{
PayloadFormat: d.Properties.PayloadFormat,
PayloadFormatFlag: d.Properties.PayloadFormatFlag,
MessageExpiryInterval: d.Properties.MessageExpiryInterval,
ContentType: d.Properties.ContentType,
ResponseTopic: d.Properties.ResponseTopic,
CorrelationData: d.Properties.CorrelationData,
SubscriptionIdentifier: d.Properties.SubscriptionIdentifier,
TopicAlias: d.Properties.TopicAlias,
User: d.Properties.User,
},
PacketID: 100,
Created: d.Created,
}, pk)
}

View File

@@ -1414,7 +1414,7 @@ func (s *Server) loadClients(v []storage.Client) {
func (s *Server) loadInflight(v []storage.Message) {
for _, msg := range v {
if client, ok := s.Clients.Get(msg.Origin); ok {
client.State.Inflight.Set(msgToPacket(&msg))
client.State.Inflight.Set(msg.ToPacket())
}
}
}
@@ -1422,30 +1422,7 @@ func (s *Server) loadInflight(v []storage.Message) {
// loadRetained restores retained messages from the datastore.
func (s *Server) loadRetained(v []storage.Message) {
for _, msg := range v {
s.Topics.RetainMessage(msgToPacket(&msg))
}
}
// msgToPacket converts storage.Message to packets.Packet
func msgToPacket(msg *storage.Message) packets.Packet {
return packets.Packet{
FixedHeader: msg.FixedHeader,
PacketID: msg.PacketID,
TopicName: msg.TopicName,
Payload: msg.Payload,
Origin: msg.Origin,
Created: msg.Created,
Properties: packets.Properties{
PayloadFormat: msg.Properties.PayloadFormat,
PayloadFormatFlag: msg.Properties.PayloadFormatFlag,
MessageExpiryInterval: msg.Properties.MessageExpiryInterval,
ContentType: msg.Properties.ContentType,
ResponseTopic: msg.Properties.ResponseTopic,
CorrelationData: msg.Properties.CorrelationData,
SubscriptionIdentifier: msg.Properties.SubscriptionIdentifier,
TopicAlias: msg.Properties.TopicAlias,
User: msg.Properties.User,
},
s.Topics.RetainMessage(msg.ToPacket())
}
}