Simplify Client construction, add NewClient method to Server, add Publish convenience method

This commit is contained in:
mochi-co
2022-12-12 11:37:19 +00:00
parent aac186dcc1
commit fe5d9ffa61
6 changed files with 310 additions and 229 deletions

View File

@@ -199,6 +199,31 @@ func (o *Options) ensureDefaults() {
}
}
// NewClient returns a new Client instance, populated with all the required values and
// references to be used with the server. If you are using this client to directly publish
// messages from the embedding application, set the inline flag to true to bypass ACL and
// topic validation checks.
func (s *Server) NewClient(c net.Conn, listener string, id string, inline bool) *Client {
cl := newClient(c, &ops{ // [MQTT-3.1.2-6] implicit
capabilities: s.Options.Capabilities,
info: s.Info,
hooks: s.hooks,
log: s.Log,
})
cl.ID = id
cl.Net.Listener = listener
if inline { // inline clients bypass acl and some validity checks.
cl.Net.Inline = true
// By default we don't want to restrict developer publishes,
// but if you do, reset this after creating inline client.
cl.State.Inflight.ResetReceiveQuota(math.MaxInt32)
}
return cl
}
// AddHook attaches a new Hook to the server. Ideally, this should be called
// before the server is started with s.Serve().
func (s *Server) AddHook(hook Hook, config any) error {
@@ -281,27 +306,21 @@ func (s *Server) eventLoop() {
}
// EstablishConnection establishes a new client when a listener accepts a new connection.
func (s *Server) EstablishConnection(lid string, c net.Conn) error {
cl := NewClient(c, &ops{ // [MQTT-3.1.2-6] implicit
capabilities: s.Options.Capabilities,
info: s.Info,
hooks: s.hooks,
log: s.Log,
})
return s.attachClient(cl, lid)
func (s *Server) EstablishConnection(listener string, c net.Conn) error {
cl := s.NewClient(c, listener, "", false)
return s.attachClient(cl, listener)
}
// attachClient validates an incoming client connection and if viable, attaches the client
// to the server, performs session housekeeping, and reads incoming packets.
func (s *Server) attachClient(cl *Client, lid string) error {
func (s *Server) attachClient(cl *Client, listener string) error {
defer cl.Stop(nil)
pk, err := s.readConnectionPacket(cl)
if err != nil {
return fmt.Errorf("read connection: %w", err)
}
cl.ParseConnect(lid, pk)
cl.ParseConnect(listener, pk)
code := s.validateConnect(cl, pk) // [MQTT-3.1.4-1] [MQTT-3.1.4-2]
if code != packets.CodeSuccess {
if err := s.sendConnack(cl, code, false); err != nil {
@@ -353,7 +372,7 @@ func (s *Server) attachClient(cl *Client, lid string) error {
cl.Properties.Will = Will{} // [MQTT-3.14.4-3] [MQTT-3.1.2-10]
}
s.Log.Debug().Str("client", cl.ID).Err(err).Str("remote", cl.Net.Remote).Str("listener", lid).Msg("client disconnected")
s.Log.Debug().Str("client", cl.ID).Err(err).Str("remote", cl.Net.Remote).Str("listener", listener).Msg("client disconnected")
expire := (cl.Properties.ProtocolVersion == 5 && cl.Properties.Props.SessionExpiryIntervalFlag && cl.Properties.Props.SessionExpiryInterval == 0) || (cl.Properties.ProtocolVersion < 5 && cl.Properties.Clean)
s.hooks.OnDisconnect(cl, err, expire)
if expire {
@@ -592,6 +611,24 @@ func (s *Server) processPingreq(cl *Client, _ packets.Packet) error {
})
}
// Publish publishes a publish packet into the broker as if it were sent from the speicfied client.
// This is a convenience function which wraps InjectPacket. As such, this method can publish packets
// to any topic (including $SYS) and bypass ACL checks. The qos byte is used for limiting the
// outbound qos (mqtt v5) rather than issuing to the broker (we assume qos 2 complete).
func (s *Server) Publish(topic string, payload []byte, retain bool, qos byte) error {
cl := s.NewClient(nil, "local", "inline", true)
return s.InjectPacket(cl, packets.Packet{
FixedHeader: packets.FixedHeader{
Type: packets.Publish,
Qos: qos,
Retain: retain,
},
TopicName: topic,
Payload: payload,
PacketID: uint16(qos), // we never process the inbound qos, but we need a packet id for validity checks.
})
}
// InjectPacket injects a packet into the broker as if it were sent from the specified client.
// InlineClients using this method can publish packets to any topic (including $SYS) and bypass ACL checks.
func (s *Server) InjectPacket(cl *Client, pk packets.Packet) error {
@@ -627,7 +664,7 @@ func (s *Server) processPublish(cl *Client, pk packets.Packet) error {
pk.Origin = cl.ID
pk.Created = time.Now().Unix()
if pki, ok := cl.State.Inflight.Get(pk.PacketID); ok {
if pki, ok := cl.State.Inflight.Get(pk.PacketID); ok && !cl.Net.Inline {
if pki.FixedHeader.Type == packets.Pubrec { // [MQTT-4.3.3-10]
ack := s.buildAck(pk.PacketID, packets.Pubrec, 0, pk.Properties, packets.ErrPacketIdentifierInUse)
return cl.WritePacket(ack)
@@ -1087,12 +1124,14 @@ func (s *Server) DisconnectClient(cl *Client, code packets.Code) error {
out.Properties.ReasonString = code.Reason // // [MQTT-3.14.2-1]
}
err := cl.WritePacket(out)
// We already have a code we are using to disconnect the client, so we are not
// interested if the write packet fails due to a closed connection (as we are closing it).
_ = cl.WritePacket(out)
if !s.Options.Capabilities.Compatibilities.PassiveClientDisconnect {
cl.Stop(code)
}
return err
return code
}
// publishSysTopics publishes the current values to the server $SYS topics.
@@ -1304,9 +1343,7 @@ func (s *Server) loadSubscriptions(v []storage.Subscription) {
// loadClients restores clients from the datastore.
func (s *Server) loadClients(v []storage.Client) {
for _, c := range v {
cl := newClientStub()
cl.ID = c.ID
cl.Net.Listener = c.Listener
cl := s.NewClient(nil, c.Listener, c.ID, false)
cl.Properties.Username = c.Username
cl.Properties.Clean = c.Clean
cl.Properties.ProtocolVersion = c.ProtocolVersion