mirror of
https://github.com/mochi-mqtt/server.git
synced 2025-09-26 20:21:12 +08:00
251 lines
7.1 KiB
Go
251 lines
7.1 KiB
Go
// SPDX-License-Identifier: MIT
|
|
// SPDX-FileCopyrightText: 2022 mochi-co
|
|
// SPDX-FileContributor: mochi-co
|
|
|
|
package debug
|
|
|
|
import (
|
|
"strings"
|
|
|
|
"github.com/mochi-co/mqtt/v2"
|
|
"github.com/mochi-co/mqtt/v2/hooks/storage"
|
|
"github.com/mochi-co/mqtt/v2/packets"
|
|
|
|
"github.com/rs/zerolog"
|
|
)
|
|
|
|
// Options contains configuration settings for the debug output.
|
|
type Options struct {
|
|
ShowPacketData bool // include decoded packet data (default false)
|
|
ShowPings bool // show ping requests and responses (default false)
|
|
ShowPasswords bool // show connecting user passwords (default false)
|
|
}
|
|
|
|
// Hook is a debugging hook which logs additional low-level information from the server.
|
|
type Hook struct {
|
|
mqtt.HookBase
|
|
config *Options
|
|
Log *zerolog.Logger
|
|
}
|
|
|
|
// ID returns the ID of the hook.
|
|
func (h *Hook) ID() string {
|
|
return "debug"
|
|
}
|
|
|
|
// Provides indicates that this hook provides all methods.
|
|
func (h *Hook) Provides(b byte) bool {
|
|
return true
|
|
}
|
|
|
|
// Init is called when the hook is initialized.
|
|
func (h *Hook) Init(config any) error {
|
|
if _, ok := config.(*Options); !ok && config != nil {
|
|
return mqtt.ErrInvalidConfigType
|
|
}
|
|
|
|
if config == nil {
|
|
config = new(Options)
|
|
}
|
|
|
|
h.config = config.(*Options)
|
|
|
|
return nil
|
|
}
|
|
|
|
// SetOpts is called when the hook receives inheritable server parameters.
|
|
func (h *Hook) SetOpts(l *zerolog.Logger, opts *mqtt.HookOptions) {
|
|
h.Log = l
|
|
h.Log.Debug().Interface("opts", opts).Str("method", "SetOpts").Send()
|
|
}
|
|
|
|
// Stop is called when the hook is stopped.
|
|
func (h *Hook) Stop() error {
|
|
h.Log.Debug().Str("method", "Stop").Send()
|
|
return nil
|
|
}
|
|
|
|
// OnStarted is called when the server starts.
|
|
func (h *Hook) OnStarted() {
|
|
h.Log.Debug().Str("method", "OnStarted").Send()
|
|
}
|
|
|
|
// OnStopped is called when the server stops.
|
|
func (h *Hook) OnStopped() {
|
|
h.Log.Debug().Str("method", "OnStopped").Send()
|
|
}
|
|
|
|
// OnPacketRead is called when a new packet is received from a client.
|
|
func (h *Hook) OnPacketRead(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {
|
|
if (pk.FixedHeader.Type == packets.Pingresp || pk.FixedHeader.Type == packets.Pingreq) && !h.config.ShowPings {
|
|
return pk, nil
|
|
}
|
|
|
|
h.Log.Debug().Interface("m", h.packetMeta(pk)).Msgf("%s << %s", strings.ToUpper(packets.PacketNames[pk.FixedHeader.Type]), cl.ID)
|
|
|
|
return pk, nil
|
|
}
|
|
|
|
// OnPacketSent is called when a packet is sent to a client.
|
|
func (h *Hook) OnPacketSent(cl *mqtt.Client, pk packets.Packet, b []byte) {
|
|
if (pk.FixedHeader.Type == packets.Pingresp || pk.FixedHeader.Type == packets.Pingreq) && !h.config.ShowPings {
|
|
return
|
|
}
|
|
|
|
h.Log.Debug().Interface("m", h.packetMeta(pk)).Msgf("%s >> %s", strings.ToUpper(packets.PacketNames[pk.FixedHeader.Type]), cl.ID)
|
|
}
|
|
|
|
// OnRetainMessage is called when a published message is retained (or retain deleted/modified).
|
|
func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
|
|
h.Log.Debug().Interface("m", h.packetMeta(pk)).Msgf("retained message on topic")
|
|
}
|
|
|
|
// OnQosPublish is called when a publish packet with Qos is issued to a subscriber.
|
|
func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, resends int) {
|
|
h.Log.Debug().Interface("m", h.packetMeta(pk)).Msgf("inflight out")
|
|
}
|
|
|
|
// OnQosComplete is called when the Qos flow for a message has been completed.
|
|
func (h *Hook) OnQosComplete(cl *mqtt.Client, pk packets.Packet) {
|
|
h.Log.Debug().Interface("m", h.packetMeta(pk)).Msgf("inflight complete")
|
|
}
|
|
|
|
// OnQosDropped is called the Qos flow for a message expires.
|
|
func (h *Hook) OnQosDropped(cl *mqtt.Client, pk packets.Packet) {
|
|
h.Log.Debug().Interface("m", h.packetMeta(pk)).Msgf("inflight dropped")
|
|
}
|
|
|
|
// OnLWTSent is called when a will message has been issued from a disconnecting client.
|
|
func (h *Hook) OnLWTSent(cl *mqtt.Client, pk packets.Packet) {
|
|
h.Log.Debug().Str("method", "OnLWTSent").Str("client", cl.ID).Msg("sent lwt for client")
|
|
}
|
|
|
|
// OnRetainedExpired is called when the server clears expired retained messages.
|
|
func (h *Hook) OnRetainedExpired(filter string) {
|
|
h.Log.Debug().Str("method", "OnRetainedExpired").Str("topic", filter).Msg("retained message expired")
|
|
}
|
|
|
|
// OnClientExpired is called when the server clears an expired client.
|
|
func (h *Hook) OnClientExpired(cl *mqtt.Client) {
|
|
h.Log.Debug().Str("method", "OnClientExpired").Str("client", cl.ID).Msg("client session expired")
|
|
}
|
|
|
|
// StoredClients is called when the server restores clients from a store.
|
|
func (h *Hook) StoredClients() (v []storage.Client, err error) {
|
|
h.Log.Debug().
|
|
Str("method", "StoredClients").
|
|
Send()
|
|
|
|
return v, nil
|
|
}
|
|
|
|
// StoredClients is called when the server restores subscriptions from a store.
|
|
func (h *Hook) StoredSubscriptions() (v []storage.Subscription, err error) {
|
|
h.Log.Debug().
|
|
Str("method", "StoredSubscriptions").
|
|
Send()
|
|
|
|
return v, nil
|
|
}
|
|
|
|
// StoredClients is called when the server restores retained messages from a store.
|
|
func (h *Hook) StoredRetainedMessages() (v []storage.Message, err error) {
|
|
h.Log.Debug().
|
|
Str("method", "StoredRetainedMessages").
|
|
Send()
|
|
|
|
return v, nil
|
|
}
|
|
|
|
// StoredClients is called when the server restores inflight messages from a store.
|
|
func (h *Hook) StoredInflightMessages() (v []storage.Message, err error) {
|
|
h.Log.Debug().
|
|
Str("method", "StoredInflightMessages").
|
|
Send()
|
|
|
|
return v, nil
|
|
}
|
|
|
|
// StoredClients is called when the server restores system info from a store.
|
|
func (h *Hook) StoredSysInfo() (v storage.SystemInfo, err error) {
|
|
h.Log.Debug().
|
|
Str("method", "StoredClients").
|
|
Send()
|
|
|
|
return v, nil
|
|
}
|
|
|
|
// packetMeta adds additional type-specific metadata to the debug logs.
|
|
func (h *Hook) packetMeta(pk packets.Packet) map[string]any {
|
|
m := map[string]any{}
|
|
switch pk.FixedHeader.Type {
|
|
case packets.Connect:
|
|
m["id"] = pk.Connect.ClientIdentifier
|
|
m["clean"] = pk.Connect.Clean
|
|
m["keepalive"] = pk.Connect.Keepalive
|
|
m["version"] = pk.ProtocolVersion
|
|
m["username"] = string(pk.Connect.Username)
|
|
if h.config.ShowPasswords {
|
|
m["password"] = string(pk.Connect.Password)
|
|
}
|
|
if pk.Connect.WillFlag {
|
|
m["will_topic"] = pk.Connect.WillTopic
|
|
m["will_payload"] = string(pk.Connect.WillPayload)
|
|
}
|
|
case packets.Publish:
|
|
m["topic"] = pk.TopicName
|
|
m["payload"] = string(pk.Payload)
|
|
m["raw"] = pk.Payload
|
|
m["qos"] = pk.FixedHeader.Qos
|
|
m["id"] = pk.PacketID
|
|
case packets.Connack:
|
|
fallthrough
|
|
case packets.Disconnect:
|
|
fallthrough
|
|
case packets.Puback:
|
|
fallthrough
|
|
case packets.Pubrec:
|
|
fallthrough
|
|
case packets.Pubrel:
|
|
fallthrough
|
|
case packets.Pubcomp:
|
|
m["id"] = pk.PacketID
|
|
m["reason"] = int(pk.ReasonCode)
|
|
if pk.ReasonCode > packets.CodeSuccess.Code && pk.ProtocolVersion == 5 {
|
|
m["reason_string"] = pk.Properties.ReasonString
|
|
}
|
|
case packets.Subscribe:
|
|
f := map[string]int{}
|
|
ids := map[string]int{}
|
|
for _, v := range pk.Filters {
|
|
f[v.Filter] = int(v.Qos)
|
|
ids[v.Filter] = v.Identifier
|
|
}
|
|
m["filters"] = f
|
|
m["subids"] = f
|
|
|
|
case packets.Unsubscribe:
|
|
f := []string{}
|
|
for _, v := range pk.Filters {
|
|
f = append(f, v.Filter)
|
|
}
|
|
m["filters"] = f
|
|
case packets.Suback:
|
|
fallthrough
|
|
case packets.Unsuback:
|
|
r := []int{}
|
|
for _, v := range pk.ReasonCodes {
|
|
r = append(r, int(v))
|
|
}
|
|
m["reasons"] = r
|
|
case packets.Auth:
|
|
// tbd
|
|
}
|
|
|
|
if h.config.ShowPacketData {
|
|
m["packet"] = pk
|
|
}
|
|
|
|
return m
|
|
}
|