mirror of
https://github.com/mochi-mqtt/server.git
synced 2025-09-27 04:26:23 +08:00
Add a demonstration in examples/hooks on how to subscribe to a topic and publish messages directly within the hook. (#333)
This commit is contained in:
@@ -28,7 +28,10 @@ func main() {
|
|||||||
done <- true
|
done <- true
|
||||||
}()
|
}()
|
||||||
|
|
||||||
server := mqtt.New(nil)
|
server := mqtt.New(&mqtt.Options{
|
||||||
|
InlineClient: true, // you must enable inline client to use direct publishing and subscribing.
|
||||||
|
})
|
||||||
|
|
||||||
_ = server.AddHook(new(auth.AllowHook), nil)
|
_ = server.AddHook(new(auth.AllowHook), nil)
|
||||||
tcp := listeners.NewTCP("t1", ":1883", nil)
|
tcp := listeners.NewTCP("t1", ":1883", nil)
|
||||||
err := server.AddListener(tcp)
|
err := server.AddListener(tcp)
|
||||||
@@ -36,7 +39,11 @@ func main() {
|
|||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = server.AddHook(new(ExampleHook), map[string]any{})
|
// Add custom hook (ExampleHook) to the server
|
||||||
|
err = server.AddHook(new(ExampleHook), &ExampleHookOptions{
|
||||||
|
Server: server,
|
||||||
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
@@ -87,8 +94,14 @@ func main() {
|
|||||||
server.Log.Info("main.go finished")
|
server.Log.Info("main.go finished")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Options contains configuration settings for the hook.
|
||||||
|
type ExampleHookOptions struct {
|
||||||
|
Server *mqtt.Server
|
||||||
|
}
|
||||||
|
|
||||||
type ExampleHook struct {
|
type ExampleHook struct {
|
||||||
mqtt.HookBase
|
mqtt.HookBase
|
||||||
|
config *ExampleHookOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *ExampleHook) ID() string {
|
func (h *ExampleHook) ID() string {
|
||||||
@@ -108,11 +121,34 @@ func (h *ExampleHook) Provides(b byte) bool {
|
|||||||
|
|
||||||
func (h *ExampleHook) Init(config any) error {
|
func (h *ExampleHook) Init(config any) error {
|
||||||
h.Log.Info("initialised")
|
h.Log.Info("initialised")
|
||||||
|
if _, ok := config.(*ExampleHookOptions); !ok && config != nil {
|
||||||
|
return mqtt.ErrInvalidConfigType
|
||||||
|
}
|
||||||
|
|
||||||
|
h.config = config.(*ExampleHookOptions)
|
||||||
|
if h.config.Server == nil {
|
||||||
|
return mqtt.ErrInvalidConfigType
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// subscribeCallback handles messages for subscribed topics
|
||||||
|
func (h *ExampleHook) subscribeCallback(cl *mqtt.Client, sub packets.Subscription, pk packets.Packet) {
|
||||||
|
h.Log.Info("hook subscribed message", "client", cl.ID, "topic", pk.TopicName)
|
||||||
|
}
|
||||||
|
|
||||||
func (h *ExampleHook) OnConnect(cl *mqtt.Client, pk packets.Packet) error {
|
func (h *ExampleHook) OnConnect(cl *mqtt.Client, pk packets.Packet) error {
|
||||||
h.Log.Info("client connected", "client", cl.ID)
|
h.Log.Info("client connected", "client", cl.ID)
|
||||||
|
|
||||||
|
// Example demonstrating how to subscribe to a topic within the hook.
|
||||||
|
h.config.Server.Subscribe("hook/direct/publish", 1, h.subscribeCallback)
|
||||||
|
|
||||||
|
// Example demonstrating how to publish a message within the hook
|
||||||
|
err := h.config.Server.Publish("hook/direct/publish", []byte("packet hook message"), false, 0)
|
||||||
|
if err != nil {
|
||||||
|
h.Log.Error("hook.publish", "error", err)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user