Merge pull request #1 from lucacasonato/fix-golint-issues

Made golint complain less
This commit is contained in:
Luca Casonato
2019-11-01 22:20:04 +01:00
committed by GitHub
5 changed files with 43 additions and 16 deletions

14
mqtt.go
View File

@@ -29,12 +29,16 @@ type ClientOptions struct {
type QOS byte type QOS byte
const ( const (
AtMostOnce QOS = iota // Deliver at most once to every subscriber - this means message delivery is not guaranteed // AtMostOnce means the broker will deliver at most once to every subscriber - this means message delivery is not guaranteed
AtLeastOnce // Deliver a message at least once to every subscriber AtMostOnce QOS = iota
ExactlyOnce // Deliver a message exactly once to every subscriber // AtLeastOnce means the broker will deliver a message at least once to every subscriber
AtLeastOnce
// ExactlyOnce means the broker will deliver a message exactly once to every subscriber
ExactlyOnce
) )
var ( var (
// ErrMinimumOneServer means that at least one server should be specified in the client options
ErrMinimumOneServer = errors.New("mqtt: at least one server needs to be specified") ErrMinimumOneServer = errors.New("mqtt: at least one server needs to be specified")
) )
@@ -88,14 +92,14 @@ func NewClient(options ClientOptions) (*Client, error) {
return &Client{client: pahoClient, Options: options, router: router}, nil return &Client{client: pahoClient, Options: options, router: router}, nil
} }
// Connect tries to establish a conenction with the mqtt servers // Connect tries to establish a connection with the mqtt servers
func (c *Client) Connect(ctx context.Context) error { func (c *Client) Connect(ctx context.Context) error {
// try to connect to the client // try to connect to the client
token := c.client.Connect() token := c.client.Connect()
return tokenWithContext(ctx, token) return tokenWithContext(ctx, token)
} }
// Disconnect will immediately close the conenction with the mqtt servers // DisconnectImmediately will immediately close the connection with the mqtt servers
func (c *Client) DisconnectImmediately() { func (c *Client) DisconnectImmediately() {
c.client.Disconnect(0) c.client.Disconnect(0)
} }

View File

@@ -5,20 +5,25 @@ import (
"encoding/json" "encoding/json"
) )
// PublishOption are extra options when publishing a message
type PublishOption int type PublishOption int
const ( const (
// Retain tells the broker to retain a message and send it as the first message to new subscribers.
Retain PublishOption = iota Retain PublishOption = iota
) )
// Publish a message with a byte array payload
func (c *Client) Publish(ctx context.Context, topic string, payload []byte, qos QOS, options ...PublishOption) error { func (c *Client) Publish(ctx context.Context, topic string, payload []byte, qos QOS, options ...PublishOption) error {
return c.publish(ctx, topic, payload, qos, options) return c.publish(ctx, topic, payload, qos, options)
} }
// PublishString publishes a message with a string payload
func (c *Client) PublishString(ctx context.Context, topic string, payload string, qos QOS, options ...PublishOption) error { func (c *Client) PublishString(ctx context.Context, topic string, payload string, qos QOS, options ...PublishOption) error {
return c.publish(ctx, topic, []byte(payload), qos, options) return c.publish(ctx, topic, []byte(payload), qos, options)
} }
// PublishJSON publishes a message with the payload encoded as JSON using encoding/json
func (c *Client) PublishJSON(ctx context.Context, topic string, payload interface{}, qos QOS, options ...PublishOption) error { func (c *Client) PublishJSON(ctx context.Context, topic string, payload interface{}, qos QOS, options ...PublishOption) error {
data, err := json.Marshal(payload) data, err := json.Marshal(payload)
if err != nil { if err != nil {

View File

@@ -16,6 +16,7 @@ func newRouter() *router {
return &router{routes: []Route{}, lock: sync.RWMutex{}} return &router{routes: []Route{}, lock: sync.RWMutex{}}
} }
// Route is a receipt for listening or handling certain topic
type Route struct { type Route struct {
router *router router *router
id string id string

View File

@@ -7,49 +7,64 @@ import (
paho "github.com/eclipse/paho.mqtt.golang" paho "github.com/eclipse/paho.mqtt.golang"
) )
// A Message from or to the broker
type Message struct { type Message struct {
message paho.Message message paho.Message
vars []string vars []string
} }
// A MessageHandler to handle incoming messages
type MessageHandler func(Message) type MessageHandler func(Message)
// TopicVars is a list of all the message specific matches for a wildcard in a route topic.
// If the route would be `config/+/full` and the messages topic is `config/server_1/full` then thous would return `[]string{"server_1"}`
func (m *Message) TopicVars() []string { func (m *Message) TopicVars() []string {
return m.vars return m.vars
} }
// Topic is the topic the message was recieved on
func (m *Message) Topic() string { func (m *Message) Topic() string {
return m.message.Topic() return m.message.Topic()
} }
// QOS is the quality of service the message was recieved with
func (m *Message) QOS() QOS { func (m *Message) QOS() QOS {
return QOS(m.message.Qos()) return QOS(m.message.Qos())
} }
// IsDuplicate is true if this exact message has been recieved before (due to a AtLeastOnce QOS)
func (m *Message) IsDuplicate() bool { func (m *Message) IsDuplicate() bool {
return m.message.Duplicate() return m.message.Duplicate()
} }
// Acknowledge explicitly acknowledges to a broker that the message has been recieved
func (m *Message) Acknowledge() { func (m *Message) Acknowledge() {
m.message.Ack() m.message.Ack()
} }
// Payload returns the payload as a byte array
func (m *Message) Payload() []byte { func (m *Message) Payload() []byte {
return m.message.Payload() return m.message.Payload()
} }
// PayloadString returns the payload as a string
func (m *Message) PayloadString() string { func (m *Message) PayloadString() string {
return string(m.message.Payload()) return string(m.message.Payload())
} }
// PayloadJSON unmarshals the payload into the provided interface using encoding/json and returns an error if anything fails
func (m *Message) PayloadJSON(v interface{}) error { func (m *Message) PayloadJSON(v interface{}) error {
return json.Unmarshal(m.message.Payload(), v) return json.Unmarshal(m.message.Payload(), v)
} }
// Handle adds a handler for a certain topic. This handler gets called if any message arrives that matches the topic.
// Also returns a route that can be used to unsubsribe. Does not automatically subscribe.
func (c *Client) Handle(topic string, handler MessageHandler) Route { func (c *Client) Handle(topic string, handler MessageHandler) Route {
return c.router.addRoute(topic, handler) return c.router.addRoute(topic, handler)
} }
// Listen returns a stream of messages that match the topic.
// Also returns a route that can be used to unsubsribe. Does not automatically subscribe.
func (c *Client) Listen(topic string) (chan Message, Route) { func (c *Client) Listen(topic string) (chan Message, Route) {
queue := make(chan Message) queue := make(chan Message)
route := c.router.addRoute(topic, func(message Message) { route := c.router.addRoute(topic, func(message Message) {
@@ -58,12 +73,14 @@ func (c *Client) Listen(topic string) (chan Message, Route) {
return queue, route return queue, route
} }
// Subscribe subscribes to a certain topic and errors if this fails.
func (c *Client) Subscribe(ctx context.Context, topic string, qos QOS) error { func (c *Client) Subscribe(ctx context.Context, topic string, qos QOS) error {
token := c.client.Subscribe(topic, byte(qos), nil) token := c.client.Subscribe(topic, byte(qos), nil)
err := tokenWithContext(ctx, token) err := tokenWithContext(ctx, token)
return err return err
} }
// Unsubscribe unsubscribes from a certain topic and errors if this fails.
func (c *Client) Unsubscribe(ctx context.Context, topic string) error { func (c *Client) Unsubscribe(ctx context.Context, topic string) error {
token := c.client.Unsubscribe(topic) token := c.client.Unsubscribe(topic)
err := tokenWithContext(ctx, token) err := tokenWithContext(ctx, token)

View File

@@ -29,10 +29,10 @@ func TestSubcribeSuccess(t *testing.T) {
t.Fatalf("connect should not have failed: %v", err) t.Fatalf("connect should not have failed: %v", err)
} }
reciever := make(chan mqtt.Message) receiver := make(chan mqtt.Message)
err = client.Subscribe(ctx(), testUUID+"/TestSubcribeSuccess/#", mqtt.ExactlyOnce) err = client.Subscribe(ctx(), testUUID+"/TestSubcribeSuccess/#", mqtt.ExactlyOnce)
client.Handle(testUUID+"/TestSubcribeSuccess/#", func(message mqtt.Message) { client.Handle(testUUID+"/TestSubcribeSuccess/#", func(message mqtt.Message) {
reciever <- message receiver <- message
}) })
if err != nil { if err != nil {
t.Fatalf("subscribe should not have failed: %v", err) t.Fatalf("subscribe should not have failed: %v", err)
@@ -41,7 +41,7 @@ func TestSubcribeSuccess(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("publish should not have failed: %v", err) t.Fatalf("publish should not have failed: %v", err)
} }
message := <-reciever message := <-receiver
if string(message.Payload()) != "[1, 2]" { if string(message.Payload()) != "[1, 2]" {
t.Fatalf("message payload should have been byte array '%v' but is %v", []byte("[1, 2]"), message.Payload()) t.Fatalf("message payload should have been byte array '%v' but is %v", []byte("[1, 2]"), message.Payload())
} }
@@ -88,7 +88,7 @@ func TestListenSuccess(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("connect should not have failed: %v", err) t.Fatalf("connect should not have failed: %v", err)
} }
reciever, _ := client.Listen(testUUID + "/TestListenSuccess") receiver, _ := client.Listen(testUUID + "/TestListenSuccess")
err = client.Subscribe(ctx(), testUUID+"/TestListenSuccess", mqtt.ExactlyOnce) err = client.Subscribe(ctx(), testUUID+"/TestListenSuccess", mqtt.ExactlyOnce)
if err != nil { if err != nil {
t.Fatalf("subscribe should not have failed: %v", err) t.Fatalf("subscribe should not have failed: %v", err)
@@ -97,7 +97,7 @@ func TestListenSuccess(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("publish should not have failed: %v", err) t.Fatalf("publish should not have failed: %v", err)
} }
message := <-reciever message := <-receiver
if message.PayloadString() != "hello" { if message.PayloadString() != "hello" {
t.Fatalf("message payload should have been 'hello' but is %v", message) t.Fatalf("message payload should have been 'hello' but is %v", message)
} }
@@ -139,9 +139,9 @@ func TestSubcribeSuccessAdvancedRouting(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("connect should not have failed: %v", err) t.Fatalf("connect should not have failed: %v", err)
} }
reciever := make(chan mqtt.Message) receiver := make(chan mqtt.Message)
client.Handle(testUUID+"/TestSubcribeSuccessAdvancedRouting/#", func(message mqtt.Message) { client.Handle(testUUID+"/TestSubcribeSuccessAdvancedRouting/#", func(message mqtt.Message) {
reciever <- message receiver <- message
}) })
err = client.Subscribe(ctx(), testUUID+"/TestSubcribeSuccessAdvancedRouting/#", mqtt.ExactlyOnce) err = client.Subscribe(ctx(), testUUID+"/TestSubcribeSuccessAdvancedRouting/#", mqtt.ExactlyOnce)
if err != nil { if err != nil {
@@ -151,7 +151,7 @@ func TestSubcribeSuccessAdvancedRouting(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("publish should not have failed: %v", err) t.Fatalf("publish should not have failed: %v", err)
} }
message := <-reciever message := <-receiver
if message.PayloadString() != "hello world" { if message.PayloadString() != "hello world" {
t.Fatalf("message payload should have been 'hello world' but is %v", message.PayloadString()) t.Fatalf("message payload should have been 'hello world' but is %v", message.PayloadString())
} }
@@ -234,7 +234,7 @@ func TestRemoveRoute(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("connect should not have failed: %v", err) t.Fatalf("connect should not have failed: %v", err)
} }
reciever, route := client.Listen(testUUID + "/TestRemoveRoute") receiver, route := client.Listen(testUUID + "/TestRemoveRoute")
err = client.Subscribe(ctx(), testUUID+"/TestRemoveRoute", mqtt.ExactlyOnce) err = client.Subscribe(ctx(), testUUID+"/TestRemoveRoute", mqtt.ExactlyOnce)
if err != nil { if err != nil {
t.Fatalf("subscribe should not have failed: %v", err) t.Fatalf("subscribe should not have failed: %v", err)
@@ -243,10 +243,10 @@ func TestRemoveRoute(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("publish should not have failed: %v", err) t.Fatalf("publish should not have failed: %v", err)
} }
<-reciever <-receiver
route.Stop() route.Stop()
select { select {
case <-reciever: case <-receiver:
t.Fatalf("recieved a message which was not meant to happen: %v", err) t.Fatalf("recieved a message which was not meant to happen: %v", err)
case <-time.After(500 * time.Millisecond): case <-time.After(500 * time.Millisecond):
} }