diff --git a/mqtt.go b/mqtt.go index ece42ab..c5a7402 100644 --- a/mqtt.go +++ b/mqtt.go @@ -29,12 +29,16 @@ type ClientOptions struct { type QOS byte const ( - AtMostOnce QOS = iota // Deliver at most once to every subscriber - this means message delivery is not guaranteed - AtLeastOnce // Deliver a message at least once to every subscriber - ExactlyOnce // Deliver a message exactly once to every subscriber + // AtMostOnce means the broker will deliver at most once to every subscriber - this means message delivery is not guaranteed + AtMostOnce QOS = iota + // AtLeastOnce means the broker will deliver a message at least once to every subscriber + AtLeastOnce + // ExactlyOnce means the broker will deliver a message exactly once to every subscriber + ExactlyOnce ) var ( + // ErrMinimumOneServer means that at least one server should be specified in the client options ErrMinimumOneServer = errors.New("mqtt: at least one server needs to be specified") ) @@ -88,14 +92,14 @@ func NewClient(options ClientOptions) (*Client, error) { return &Client{client: pahoClient, Options: options, router: router}, nil } -// Connect tries to establish a conenction with the mqtt servers +// Connect tries to establish a connection with the mqtt servers func (c *Client) Connect(ctx context.Context) error { // try to connect to the client token := c.client.Connect() return tokenWithContext(ctx, token) } -// Disconnect will immediately close the conenction with the mqtt servers +// DisconnectImmediately will immediately close the connection with the mqtt servers func (c *Client) DisconnectImmediately() { c.client.Disconnect(0) } diff --git a/publish.go b/publish.go index 2f0d0a8..ab4dfac 100644 --- a/publish.go +++ b/publish.go @@ -5,20 +5,25 @@ import ( "encoding/json" ) +// PublishOption are extra options when publishing a message type PublishOption int const ( + // Retain tells the broker to retain a message and send it as the first message to new subscribers. Retain PublishOption = iota ) +// Publish a message with a byte array payload func (c *Client) Publish(ctx context.Context, topic string, payload []byte, qos QOS, options ...PublishOption) error { return c.publish(ctx, topic, payload, qos, options) } +// PublishString publishes a message with a string payload func (c *Client) PublishString(ctx context.Context, topic string, payload string, qos QOS, options ...PublishOption) error { return c.publish(ctx, topic, []byte(payload), qos, options) } +// PublishJSON publishes a message with the payload encoded as JSON using encoding/json func (c *Client) PublishJSON(ctx context.Context, topic string, payload interface{}, qos QOS, options ...PublishOption) error { data, err := json.Marshal(payload) if err != nil { diff --git a/router.go b/router.go index 00b3f1e..6c0c21e 100644 --- a/router.go +++ b/router.go @@ -16,6 +16,7 @@ func newRouter() *router { return &router{routes: []Route{}, lock: sync.RWMutex{}} } +// Route is a receipt for listening or handling certain topic type Route struct { router *router id string diff --git a/subscribe.go b/subscribe.go index ba1ff14..6ab24f9 100644 --- a/subscribe.go +++ b/subscribe.go @@ -7,49 +7,64 @@ import ( paho "github.com/eclipse/paho.mqtt.golang" ) +// A Message from or to the broker type Message struct { message paho.Message vars []string } +// A MessageHandler to handle incoming messages type MessageHandler func(Message) +// TopicVars is a list of all the message specific matches for a wildcard in a route topic. +// If the route would be `config/+/full` and the messages topic is `config/server_1/full` then thous would return `[]string{"server_1"}` func (m *Message) TopicVars() []string { return m.vars } +// Topic is the topic the message was recieved on func (m *Message) Topic() string { return m.message.Topic() } +// QOS is the quality of service the message was recieved with func (m *Message) QOS() QOS { return QOS(m.message.Qos()) } +// IsDuplicate is true if this exact message has been recieved before (due to a AtLeastOnce QOS) func (m *Message) IsDuplicate() bool { return m.message.Duplicate() } +// Acknowledge explicitly acknowledges to a broker that the message has been recieved func (m *Message) Acknowledge() { m.message.Ack() } +// Payload returns the payload as a byte array func (m *Message) Payload() []byte { return m.message.Payload() } +// PayloadString returns the payload as a string func (m *Message) PayloadString() string { return string(m.message.Payload()) } +// PayloadJSON unmarshals the payload into the provided interface using encoding/json and returns an error if anything fails func (m *Message) PayloadJSON(v interface{}) error { return json.Unmarshal(m.message.Payload(), v) } +// Handle adds a handler for a certain topic. This handler gets called if any message arrives that matches the topic. +// Also returns a route that can be used to unsubsribe. Does not automatically subscribe. func (c *Client) Handle(topic string, handler MessageHandler) Route { return c.router.addRoute(topic, handler) } +// Listen returns a stream of messages that match the topic. +// Also returns a route that can be used to unsubsribe. Does not automatically subscribe. func (c *Client) Listen(topic string) (chan Message, Route) { queue := make(chan Message) route := c.router.addRoute(topic, func(message Message) { @@ -58,12 +73,14 @@ func (c *Client) Listen(topic string) (chan Message, Route) { return queue, route } +// Subscribe subscribes to a certain topic and errors if this fails. func (c *Client) Subscribe(ctx context.Context, topic string, qos QOS) error { token := c.client.Subscribe(topic, byte(qos), nil) err := tokenWithContext(ctx, token) return err } +// Unsubscribe unsubscribes from a certain topic and errors if this fails. func (c *Client) Unsubscribe(ctx context.Context, topic string) error { token := c.client.Unsubscribe(topic) err := tokenWithContext(ctx, token) diff --git a/subscribe_test.go b/subscribe_test.go index 3b4b85c..3c1f9b5 100644 --- a/subscribe_test.go +++ b/subscribe_test.go @@ -29,10 +29,10 @@ func TestSubcribeSuccess(t *testing.T) { t.Fatalf("connect should not have failed: %v", err) } - reciever := make(chan mqtt.Message) + receiver := make(chan mqtt.Message) err = client.Subscribe(ctx(), testUUID+"/TestSubcribeSuccess/#", mqtt.ExactlyOnce) client.Handle(testUUID+"/TestSubcribeSuccess/#", func(message mqtt.Message) { - reciever <- message + receiver <- message }) if err != nil { t.Fatalf("subscribe should not have failed: %v", err) @@ -41,7 +41,7 @@ func TestSubcribeSuccess(t *testing.T) { if err != nil { t.Fatalf("publish should not have failed: %v", err) } - message := <-reciever + message := <-receiver if string(message.Payload()) != "[1, 2]" { t.Fatalf("message payload should have been byte array '%v' but is %v", []byte("[1, 2]"), message.Payload()) } @@ -88,7 +88,7 @@ func TestListenSuccess(t *testing.T) { if err != nil { t.Fatalf("connect should not have failed: %v", err) } - reciever, _ := client.Listen(testUUID + "/TestListenSuccess") + receiver, _ := client.Listen(testUUID + "/TestListenSuccess") err = client.Subscribe(ctx(), testUUID+"/TestListenSuccess", mqtt.ExactlyOnce) if err != nil { t.Fatalf("subscribe should not have failed: %v", err) @@ -97,7 +97,7 @@ func TestListenSuccess(t *testing.T) { if err != nil { t.Fatalf("publish should not have failed: %v", err) } - message := <-reciever + message := <-receiver if message.PayloadString() != "hello" { t.Fatalf("message payload should have been 'hello' but is %v", message) } @@ -139,9 +139,9 @@ func TestSubcribeSuccessAdvancedRouting(t *testing.T) { if err != nil { t.Fatalf("connect should not have failed: %v", err) } - reciever := make(chan mqtt.Message) + receiver := make(chan mqtt.Message) client.Handle(testUUID+"/TestSubcribeSuccessAdvancedRouting/#", func(message mqtt.Message) { - reciever <- message + receiver <- message }) err = client.Subscribe(ctx(), testUUID+"/TestSubcribeSuccessAdvancedRouting/#", mqtt.ExactlyOnce) if err != nil { @@ -151,7 +151,7 @@ func TestSubcribeSuccessAdvancedRouting(t *testing.T) { if err != nil { t.Fatalf("publish should not have failed: %v", err) } - message := <-reciever + message := <-receiver if message.PayloadString() != "hello world" { t.Fatalf("message payload should have been 'hello world' but is %v", message.PayloadString()) } @@ -234,7 +234,7 @@ func TestRemoveRoute(t *testing.T) { if err != nil { t.Fatalf("connect should not have failed: %v", err) } - reciever, route := client.Listen(testUUID + "/TestRemoveRoute") + receiver, route := client.Listen(testUUID + "/TestRemoveRoute") err = client.Subscribe(ctx(), testUUID+"/TestRemoveRoute", mqtt.ExactlyOnce) if err != nil { t.Fatalf("subscribe should not have failed: %v", err) @@ -243,10 +243,10 @@ func TestRemoveRoute(t *testing.T) { if err != nil { t.Fatalf("publish should not have failed: %v", err) } - <-reciever + <-receiver route.Stop() select { - case <-reciever: + case <-receiver: t.Fatalf("recieved a message which was not meant to happen: %v", err) case <-time.After(500 * time.Millisecond): }