mirror of
https://github.com/lucacasonato/mqtt.git
synced 2025-09-27 03:05:59 +08:00

* Subscribe multiple * Fixed CI * Test coverage * Fixed CI ports * Added SubscribeMultiple to readme
100 lines
3.1 KiB
Go
100 lines
3.1 KiB
Go
package mqtt
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
|
|
paho "github.com/eclipse/paho.mqtt.golang"
|
|
)
|
|
|
|
// A Message from or to the broker
|
|
type Message struct {
|
|
message paho.Message
|
|
vars []string
|
|
}
|
|
|
|
// A MessageHandler to handle incoming messages
|
|
type MessageHandler func(Message)
|
|
|
|
// TopicVars is a list of all the message specific matches for a wildcard in a route topic.
|
|
// If the route would be `config/+/full` and the messages topic is `config/server_1/full` then thous would return `[]string{"server_1"}`
|
|
func (m *Message) TopicVars() []string {
|
|
return m.vars
|
|
}
|
|
|
|
// Topic is the topic the message was recieved on
|
|
func (m *Message) Topic() string {
|
|
return m.message.Topic()
|
|
}
|
|
|
|
// QOS is the quality of service the message was recieved with
|
|
func (m *Message) QOS() QOS {
|
|
return QOS(m.message.Qos())
|
|
}
|
|
|
|
// IsDuplicate is true if this exact message has been recieved before (due to a AtLeastOnce QOS)
|
|
func (m *Message) IsDuplicate() bool {
|
|
return m.message.Duplicate()
|
|
}
|
|
|
|
// Acknowledge explicitly acknowledges to a broker that the message has been recieved
|
|
func (m *Message) Acknowledge() {
|
|
m.message.Ack()
|
|
}
|
|
|
|
// Payload returns the payload as a byte array
|
|
func (m *Message) Payload() []byte {
|
|
return m.message.Payload()
|
|
}
|
|
|
|
// PayloadString returns the payload as a string
|
|
func (m *Message) PayloadString() string {
|
|
return string(m.message.Payload())
|
|
}
|
|
|
|
// PayloadJSON unmarshals the payload into the provided interface using encoding/json and returns an error if anything fails
|
|
func (m *Message) PayloadJSON(v interface{}) error {
|
|
return json.Unmarshal(m.message.Payload(), v)
|
|
}
|
|
|
|
// Handle adds a handler for a certain topic. This handler gets called if any message arrives that matches the topic.
|
|
// Also returns a route that can be used to unsubsribe. Does not automatically subscribe.
|
|
func (c *Client) Handle(topic string, handler MessageHandler) Route {
|
|
return c.router.addRoute(topic, handler)
|
|
}
|
|
|
|
// Listen returns a stream of messages that match the topic.
|
|
// Also returns a route that can be used to unsubsribe. Does not automatically subscribe.
|
|
func (c *Client) Listen(topic string) (chan Message, Route) {
|
|
queue := make(chan Message)
|
|
route := c.router.addRoute(topic, func(message Message) {
|
|
queue <- message
|
|
})
|
|
return queue, route
|
|
}
|
|
|
|
// Subscribe subscribes to a certain topic and errors if this fails.
|
|
func (c *Client) Subscribe(ctx context.Context, topic string, qos QOS) error {
|
|
token := c.client.Subscribe(topic, byte(qos), nil)
|
|
err := tokenWithContext(ctx, token)
|
|
return err
|
|
}
|
|
|
|
// SubscribeMultiple subscribes to multiple topics and errors if this fails.
|
|
func (c *Client) SubscribeMultiple(ctx context.Context, subscriptions map[string]QOS) error {
|
|
subs := make(map[string]byte, len(subscriptions))
|
|
for topic, qos := range subscriptions {
|
|
subs[topic] = byte(qos)
|
|
}
|
|
token := c.client.SubscribeMultiple(subs, nil)
|
|
err := tokenWithContext(ctx, token)
|
|
return err
|
|
}
|
|
|
|
// Unsubscribe unsubscribes from a certain topic and errors if this fails.
|
|
func (c *Client) Unsubscribe(ctx context.Context, topic string) error {
|
|
token := c.client.Unsubscribe(topic)
|
|
err := tokenWithContext(ctx, token)
|
|
return err
|
|
}
|