mirror of
https://github.com/lucacasonato/mqtt.git
synced 2025-09-27 19:22:08 +08:00
Compare commits
6 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
feb16264b8 | ||
![]() |
a817bc8f8a | ||
![]() |
f2aa0ffd4d | ||
![]() |
7af47fa17f | ||
![]() |
4f77e387ff | ||
![]() |
208aa854e0 |
18
.github/workflows/ci.yml
vendored
18
.github/workflows/ci.yml
vendored
@@ -4,8 +4,15 @@ on: [push]
|
|||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
test:
|
test:
|
||||||
name: ci
|
name: test
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
|
services:
|
||||||
|
mqtt:
|
||||||
|
image: eclipse-mosquitto:latest
|
||||||
|
ports:
|
||||||
|
- '1883:1883'
|
||||||
|
env:
|
||||||
|
MQTT_BROKER: tcp://localhost:1883
|
||||||
steps:
|
steps:
|
||||||
- name: Set up Go 1.13
|
- name: Set up Go 1.13
|
||||||
uses: actions/setup-go@v1
|
uses: actions/setup-go@v1
|
||||||
@@ -18,8 +25,15 @@ jobs:
|
|||||||
- name: Run tests
|
- name: Run tests
|
||||||
run: go test -race
|
run: go test -race
|
||||||
coverage:
|
coverage:
|
||||||
name: Coverage
|
name: coverage
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
|
services:
|
||||||
|
mqtt:
|
||||||
|
image: eclipse-mosquitto:latest
|
||||||
|
ports:
|
||||||
|
- '1883:1883'
|
||||||
|
env:
|
||||||
|
MQTT_BROKER: tcp://localhost:1883
|
||||||
steps:
|
steps:
|
||||||
- name: Set up Go 1.13
|
- name: Set up Go 1.13
|
||||||
uses: actions/setup-go@v1
|
uses: actions/setup-go@v1
|
||||||
|
13
README.md
13
README.md
@@ -2,8 +2,8 @@
|
|||||||
|
|
||||||
[](http://godoc.org/github.com/lucacasonato/mqtt)
|
[](http://godoc.org/github.com/lucacasonato/mqtt)
|
||||||
[](https://github.com/lucacasonato/mqtt/actions?workflow=ci)
|
[](https://github.com/lucacasonato/mqtt/actions?workflow=ci)
|
||||||
[](https://codecov.io/gh/lucacasonato/mqtt)
|
[](https://codecov.io/gh/lucacasonato/mqtt)
|
||||||
[](https://goreportcard.com/report/github.com/lucacasonato/mqtt)
|
[](https://goreportcard.com/report/github.com/lucacasonato/mqtt)
|
||||||
|
|
||||||
An mqtt client for Go that improves usability over the [paho.mqtt.golang](https://github.com/eclipse/paho.mqtt.golang) library it wraps. Made for 🧑.
|
An mqtt client for Go that improves usability over the [paho.mqtt.golang](https://github.com/eclipse/paho.mqtt.golang) library it wraps. Made for 🧑.
|
||||||
|
|
||||||
@@ -94,6 +94,15 @@ if err != nil {
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
```go
|
||||||
|
err := client.SubscribeMultiple(context.WithTimeout(1 * time.Second), map[string]mqtt.QOS{
|
||||||
|
"api/v0/main/client1": mqtt.AtLeastOnce,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
### handling
|
### handling
|
||||||
|
|
||||||
```go
|
```go
|
||||||
|
2
go.mod
2
go.mod
@@ -4,6 +4,6 @@ go 1.13
|
|||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/eclipse/paho.mqtt.golang v1.2.0
|
github.com/eclipse/paho.mqtt.golang v1.2.0
|
||||||
github.com/google/uuid v1.1.1
|
github.com/google/uuid v1.1.2
|
||||||
golang.org/x/net v0.0.0-20191014212845-da9a3fd4c582 // indirect
|
golang.org/x/net v0.0.0-20191014212845-da9a3fd4c582 // indirect
|
||||||
)
|
)
|
||||||
|
5
go.sum
5
go.sum
@@ -2,8 +2,13 @@ github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t
|
|||||||
github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
|
github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
|
||||||
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
|
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
|
||||||
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
|
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
|
||||||
|
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
|
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=
|
||||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||||
golang.org/x/net v0.0.0-20191014212845-da9a3fd4c582 h1:p9xBe/w/OzkeYVKm234g55gMdD1nSIooTir5kV11kfA=
|
golang.org/x/net v0.0.0-20191014212845-da9a3fd4c582 h1:p9xBe/w/OzkeYVKm234g55gMdD1nSIooTir5kV11kfA=
|
||||||
golang.org/x/net v0.0.0-20191014212845-da9a3fd4c582/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
golang.org/x/net v0.0.0-20191014212845-da9a3fd4c582/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||||
|
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
|
||||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
|
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
|
||||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||||
|
14
mqtt.go
14
mqtt.go
@@ -29,12 +29,16 @@ type ClientOptions struct {
|
|||||||
type QOS byte
|
type QOS byte
|
||||||
|
|
||||||
const (
|
const (
|
||||||
AtMostOnce QOS = iota // Deliver at most once to every subscriber - this means message delivery is not guaranteed
|
// AtMostOnce means the broker will deliver at most once to every subscriber - this means message delivery is not guaranteed
|
||||||
AtLeastOnce // Deliver a message at least once to every subscriber
|
AtMostOnce QOS = iota
|
||||||
ExactlyOnce // Deliver a message exactly once to every subscriber
|
// AtLeastOnce means the broker will deliver a message at least once to every subscriber
|
||||||
|
AtLeastOnce
|
||||||
|
// ExactlyOnce means the broker will deliver a message exactly once to every subscriber
|
||||||
|
ExactlyOnce
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
// ErrMinimumOneServer means that at least one server should be specified in the client options
|
||||||
ErrMinimumOneServer = errors.New("mqtt: at least one server needs to be specified")
|
ErrMinimumOneServer = errors.New("mqtt: at least one server needs to be specified")
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -88,14 +92,14 @@ func NewClient(options ClientOptions) (*Client, error) {
|
|||||||
return &Client{client: pahoClient, Options: options, router: router}, nil
|
return &Client{client: pahoClient, Options: options, router: router}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Connect tries to establish a conenction with the mqtt servers
|
// Connect tries to establish a connection with the mqtt servers
|
||||||
func (c *Client) Connect(ctx context.Context) error {
|
func (c *Client) Connect(ctx context.Context) error {
|
||||||
// try to connect to the client
|
// try to connect to the client
|
||||||
token := c.client.Connect()
|
token := c.client.Connect()
|
||||||
return tokenWithContext(ctx, token)
|
return tokenWithContext(ctx, token)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Disconnect will immediately close the conenction with the mqtt servers
|
// DisconnectImmediately will immediately close the connection with the mqtt servers
|
||||||
func (c *Client) DisconnectImmediately() {
|
func (c *Client) DisconnectImmediately() {
|
||||||
c.client.Disconnect(0)
|
c.client.Disconnect(0)
|
||||||
}
|
}
|
||||||
|
38
mqtt_test.go
38
mqtt_test.go
@@ -37,11 +37,11 @@ func TestNewClientNoServer(t *testing.T) {
|
|||||||
func TestNewClientBasicServer(t *testing.T) {
|
func TestNewClientBasicServer(t *testing.T) {
|
||||||
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
||||||
Servers: []string{
|
Servers: []string{
|
||||||
"tcp://test.mosquitto.org:1883",
|
broker,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("err should be nil")
|
t.Fatalf("creating client failed: %v", err)
|
||||||
}
|
}
|
||||||
if client == nil {
|
if client == nil {
|
||||||
t.Fatal("client should not be nil")
|
t.Fatal("client should not be nil")
|
||||||
@@ -52,11 +52,11 @@ func TestNewClientBasicServer(t *testing.T) {
|
|||||||
func TestNewClientNoClientID(t *testing.T) {
|
func TestNewClientNoClientID(t *testing.T) {
|
||||||
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
||||||
Servers: []string{
|
Servers: []string{
|
||||||
"tcp://test.mosquitto.org:1883",
|
broker,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err should be nil but is %v", err)
|
t.Fatalf("creating client failed: %v", err)
|
||||||
}
|
}
|
||||||
if client == nil {
|
if client == nil {
|
||||||
t.Fatal("client should not be nil")
|
t.Fatal("client should not be nil")
|
||||||
@@ -70,12 +70,12 @@ func TestNewClientNoClientID(t *testing.T) {
|
|||||||
func TestNewClientHasClientID(t *testing.T) {
|
func TestNewClientHasClientID(t *testing.T) {
|
||||||
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
||||||
Servers: []string{
|
Servers: []string{
|
||||||
"tcp://test.mosquitto.org:1883",
|
broker,
|
||||||
},
|
},
|
||||||
ClientID: "client-id",
|
ClientID: "client-id",
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("err should be nil")
|
t.Fatalf("creating client failed: %v", err)
|
||||||
}
|
}
|
||||||
if client == nil {
|
if client == nil {
|
||||||
t.Fatal("client should not be nil")
|
t.Fatal("client should not be nil")
|
||||||
@@ -89,13 +89,13 @@ func TestNewClientHasClientID(t *testing.T) {
|
|||||||
func TestNewClientWithAuthentication(t *testing.T) {
|
func TestNewClientWithAuthentication(t *testing.T) {
|
||||||
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
||||||
Servers: []string{
|
Servers: []string{
|
||||||
"tcp://test.mosquitto.org:1883",
|
broker,
|
||||||
},
|
},
|
||||||
Username: "user",
|
Username: "user",
|
||||||
Password: "password",
|
Password: "password",
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("err should be nil")
|
t.Fatalf("creating client failed: %v", err)
|
||||||
}
|
}
|
||||||
if client == nil {
|
if client == nil {
|
||||||
t.Fatal("client should not be nil")
|
t.Fatal("client should not be nil")
|
||||||
@@ -106,18 +106,18 @@ func TestNewClientWithAuthentication(t *testing.T) {
|
|||||||
func TestConnectSuccess(t *testing.T) {
|
func TestConnectSuccess(t *testing.T) {
|
||||||
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
||||||
Servers: []string{
|
Servers: []string{
|
||||||
"tcp://test.mosquitto.org:1883",
|
broker,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("err should be nil")
|
t.Fatalf("creating client failed: %v", err)
|
||||||
}
|
}
|
||||||
if client == nil {
|
if client == nil {
|
||||||
t.Fatal("client should not be nil")
|
t.Fatal("client should not be nil")
|
||||||
}
|
}
|
||||||
err = client.Connect(ctx())
|
err = client.Connect(ctx())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("connect should not have failed")
|
t.Fatalf("connect should not have failed: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -125,11 +125,11 @@ func TestConnectSuccess(t *testing.T) {
|
|||||||
func TestConnectContextTimeout(t *testing.T) {
|
func TestConnectContextTimeout(t *testing.T) {
|
||||||
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
||||||
Servers: []string{
|
Servers: []string{
|
||||||
"tcp://test.mosquitto.org:1883",
|
broker,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("err should be nil")
|
t.Fatalf("creating client failed: %v", err)
|
||||||
}
|
}
|
||||||
if client == nil {
|
if client == nil {
|
||||||
t.Fatal("client should not be nil")
|
t.Fatal("client should not be nil")
|
||||||
@@ -146,11 +146,11 @@ func TestConnectContextTimeout(t *testing.T) {
|
|||||||
func TestConnectContextCancel(t *testing.T) {
|
func TestConnectContextCancel(t *testing.T) {
|
||||||
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
||||||
Servers: []string{
|
Servers: []string{
|
||||||
"tcp://test.mosquitto.org:1883",
|
broker,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("err should be nil")
|
t.Fatalf("creating client failed: %v", err)
|
||||||
}
|
}
|
||||||
if client == nil {
|
if client == nil {
|
||||||
t.Fatal("client should not be nil")
|
t.Fatal("client should not be nil")
|
||||||
@@ -175,7 +175,7 @@ func TestConnectFailed(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("err should be nil")
|
t.Fatalf("creating client failed: %v", err)
|
||||||
}
|
}
|
||||||
if client == nil {
|
if client == nil {
|
||||||
t.Fatal("client should not be nil")
|
t.Fatal("client should not be nil")
|
||||||
@@ -190,18 +190,18 @@ func TestConnectFailed(t *testing.T) {
|
|||||||
func TestDisconnectImmediately(t *testing.T) {
|
func TestDisconnectImmediately(t *testing.T) {
|
||||||
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
||||||
Servers: []string{
|
Servers: []string{
|
||||||
"tcp://test.mosquitto.org:1883",
|
broker,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("err should be nil")
|
t.Fatalf("creating client failed: %v", err)
|
||||||
}
|
}
|
||||||
if client == nil {
|
if client == nil {
|
||||||
t.Fatal("client should not be nil")
|
t.Fatal("client should not be nil")
|
||||||
}
|
}
|
||||||
err = client.Connect(ctx())
|
err = client.Connect(ctx())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("connect should not have failed")
|
t.Fatalf("connect should not have failed: %v", err)
|
||||||
}
|
}
|
||||||
client.DisconnectImmediately()
|
client.DisconnectImmediately()
|
||||||
}
|
}
|
||||||
|
@@ -5,20 +5,25 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// PublishOption are extra options when publishing a message
|
||||||
type PublishOption int
|
type PublishOption int
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
// Retain tells the broker to retain a message and send it as the first message to new subscribers.
|
||||||
Retain PublishOption = iota
|
Retain PublishOption = iota
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Publish a message with a byte array payload
|
||||||
func (c *Client) Publish(ctx context.Context, topic string, payload []byte, qos QOS, options ...PublishOption) error {
|
func (c *Client) Publish(ctx context.Context, topic string, payload []byte, qos QOS, options ...PublishOption) error {
|
||||||
return c.publish(ctx, topic, payload, qos, options)
|
return c.publish(ctx, topic, payload, qos, options)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PublishString publishes a message with a string payload
|
||||||
func (c *Client) PublishString(ctx context.Context, topic string, payload string, qos QOS, options ...PublishOption) error {
|
func (c *Client) PublishString(ctx context.Context, topic string, payload string, qos QOS, options ...PublishOption) error {
|
||||||
return c.publish(ctx, topic, []byte(payload), qos, options)
|
return c.publish(ctx, topic, []byte(payload), qos, options)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PublishJSON publishes a message with the payload encoded as JSON using encoding/json
|
||||||
func (c *Client) PublishJSON(ctx context.Context, topic string, payload interface{}, qos QOS, options ...PublishOption) error {
|
func (c *Client) PublishJSON(ctx context.Context, topic string, payload interface{}, qos QOS, options ...PublishOption) error {
|
||||||
data, err := json.Marshal(payload)
|
data, err := json.Marshal(payload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -12,12 +13,19 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var testUUID = uuid.New().String()
|
var testUUID = uuid.New().String()
|
||||||
|
var broker = os.Getenv("MQTT_BROKER")
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
if broker == "" {
|
||||||
|
broker = "tcp://localhost:1883"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TestPublishSuccess checks that a message publish succeeds
|
// TestPublishSuccess checks that a message publish succeeds
|
||||||
func TestPublishSuccess(t *testing.T) {
|
func TestPublishSuccess(t *testing.T) {
|
||||||
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
||||||
Servers: []string{
|
Servers: []string{
|
||||||
"tcp://test.mosquitto.org:1883",
|
broker,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -39,7 +47,7 @@ func TestPublishSuccess(t *testing.T) {
|
|||||||
func TestPublishContextTimeout(t *testing.T) {
|
func TestPublishContextTimeout(t *testing.T) {
|
||||||
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
||||||
Servers: []string{
|
Servers: []string{
|
||||||
"tcp://test.mosquitto.org:1883",
|
broker,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
defer client.DisconnectImmediately()
|
defer client.DisconnectImmediately()
|
||||||
@@ -62,7 +70,7 @@ func TestPublishContextTimeout(t *testing.T) {
|
|||||||
func TestPublishContextCancelled(t *testing.T) {
|
func TestPublishContextCancelled(t *testing.T) {
|
||||||
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
||||||
Servers: []string{
|
Servers: []string{
|
||||||
"tcp://test.mosquitto.org:1883",
|
broker,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
defer client.DisconnectImmediately()
|
defer client.DisconnectImmediately()
|
||||||
@@ -89,7 +97,7 @@ func TestPublishContextCancelled(t *testing.T) {
|
|||||||
func TestPublishFailed(t *testing.T) {
|
func TestPublishFailed(t *testing.T) {
|
||||||
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
||||||
Servers: []string{
|
Servers: []string{
|
||||||
"tcp://test.mosquitto.org:1883",
|
broker,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
defer client.DisconnectImmediately()
|
defer client.DisconnectImmediately()
|
||||||
@@ -110,7 +118,7 @@ func TestPublishFailed(t *testing.T) {
|
|||||||
func TestPublishSuccessRetained(t *testing.T) {
|
func TestPublishSuccessRetained(t *testing.T) {
|
||||||
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
||||||
Servers: []string{
|
Servers: []string{
|
||||||
"tcp://test.mosquitto.org:1883",
|
broker,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -132,7 +140,7 @@ func TestPublishSuccessRetained(t *testing.T) {
|
|||||||
func TestPublisStringSuccess(t *testing.T) {
|
func TestPublisStringSuccess(t *testing.T) {
|
||||||
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
||||||
Servers: []string{
|
Servers: []string{
|
||||||
"tcp://test.mosquitto.org:1883",
|
broker,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -154,7 +162,7 @@ func TestPublisStringSuccess(t *testing.T) {
|
|||||||
func TestPublisJSONSuccess(t *testing.T) {
|
func TestPublisJSONSuccess(t *testing.T) {
|
||||||
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
||||||
Servers: []string{
|
Servers: []string{
|
||||||
"tcp://test.mosquitto.org:1883",
|
broker,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -176,7 +184,7 @@ func TestPublisJSONSuccess(t *testing.T) {
|
|||||||
func TestPublisJSONFailed(t *testing.T) {
|
func TestPublisJSONFailed(t *testing.T) {
|
||||||
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
||||||
Servers: []string{
|
Servers: []string{
|
||||||
"tcp://test.mosquitto.org:1883",
|
broker,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@@ -16,6 +16,7 @@ func newRouter() *router {
|
|||||||
return &router{routes: []Route{}, lock: sync.RWMutex{}}
|
return &router{routes: []Route{}, lock: sync.RWMutex{}}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Route is a receipt for listening or handling certain topic
|
||||||
type Route struct {
|
type Route struct {
|
||||||
router *router
|
router *router
|
||||||
id string
|
id string
|
||||||
|
28
subscribe.go
28
subscribe.go
@@ -7,49 +7,64 @@ import (
|
|||||||
paho "github.com/eclipse/paho.mqtt.golang"
|
paho "github.com/eclipse/paho.mqtt.golang"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// A Message from or to the broker
|
||||||
type Message struct {
|
type Message struct {
|
||||||
message paho.Message
|
message paho.Message
|
||||||
vars []string
|
vars []string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// A MessageHandler to handle incoming messages
|
||||||
type MessageHandler func(Message)
|
type MessageHandler func(Message)
|
||||||
|
|
||||||
|
// TopicVars is a list of all the message specific matches for a wildcard in a route topic.
|
||||||
|
// If the route would be `config/+/full` and the messages topic is `config/server_1/full` then thous would return `[]string{"server_1"}`
|
||||||
func (m *Message) TopicVars() []string {
|
func (m *Message) TopicVars() []string {
|
||||||
return m.vars
|
return m.vars
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Topic is the topic the message was recieved on
|
||||||
func (m *Message) Topic() string {
|
func (m *Message) Topic() string {
|
||||||
return m.message.Topic()
|
return m.message.Topic()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// QOS is the quality of service the message was recieved with
|
||||||
func (m *Message) QOS() QOS {
|
func (m *Message) QOS() QOS {
|
||||||
return QOS(m.message.Qos())
|
return QOS(m.message.Qos())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsDuplicate is true if this exact message has been recieved before (due to a AtLeastOnce QOS)
|
||||||
func (m *Message) IsDuplicate() bool {
|
func (m *Message) IsDuplicate() bool {
|
||||||
return m.message.Duplicate()
|
return m.message.Duplicate()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Acknowledge explicitly acknowledges to a broker that the message has been recieved
|
||||||
func (m *Message) Acknowledge() {
|
func (m *Message) Acknowledge() {
|
||||||
m.message.Ack()
|
m.message.Ack()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Payload returns the payload as a byte array
|
||||||
func (m *Message) Payload() []byte {
|
func (m *Message) Payload() []byte {
|
||||||
return m.message.Payload()
|
return m.message.Payload()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PayloadString returns the payload as a string
|
||||||
func (m *Message) PayloadString() string {
|
func (m *Message) PayloadString() string {
|
||||||
return string(m.message.Payload())
|
return string(m.message.Payload())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PayloadJSON unmarshals the payload into the provided interface using encoding/json and returns an error if anything fails
|
||||||
func (m *Message) PayloadJSON(v interface{}) error {
|
func (m *Message) PayloadJSON(v interface{}) error {
|
||||||
return json.Unmarshal(m.message.Payload(), v)
|
return json.Unmarshal(m.message.Payload(), v)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Handle adds a handler for a certain topic. This handler gets called if any message arrives that matches the topic.
|
||||||
|
// Also returns a route that can be used to unsubsribe. Does not automatically subscribe.
|
||||||
func (c *Client) Handle(topic string, handler MessageHandler) Route {
|
func (c *Client) Handle(topic string, handler MessageHandler) Route {
|
||||||
return c.router.addRoute(topic, handler)
|
return c.router.addRoute(topic, handler)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Listen returns a stream of messages that match the topic.
|
||||||
|
// Also returns a route that can be used to unsubsribe. Does not automatically subscribe.
|
||||||
func (c *Client) Listen(topic string) (chan Message, Route) {
|
func (c *Client) Listen(topic string) (chan Message, Route) {
|
||||||
queue := make(chan Message)
|
queue := make(chan Message)
|
||||||
route := c.router.addRoute(topic, func(message Message) {
|
route := c.router.addRoute(topic, func(message Message) {
|
||||||
@@ -58,12 +73,25 @@ func (c *Client) Listen(topic string) (chan Message, Route) {
|
|||||||
return queue, route
|
return queue, route
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Subscribe subscribes to a certain topic and errors if this fails.
|
||||||
func (c *Client) Subscribe(ctx context.Context, topic string, qos QOS) error {
|
func (c *Client) Subscribe(ctx context.Context, topic string, qos QOS) error {
|
||||||
token := c.client.Subscribe(topic, byte(qos), nil)
|
token := c.client.Subscribe(topic, byte(qos), nil)
|
||||||
err := tokenWithContext(ctx, token)
|
err := tokenWithContext(ctx, token)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SubscribeMultiple subscribes to multiple topics and errors if this fails.
|
||||||
|
func (c *Client) SubscribeMultiple(ctx context.Context, subscriptions map[string]QOS) error {
|
||||||
|
subs := make(map[string]byte, len(subscriptions))
|
||||||
|
for topic, qos := range subscriptions {
|
||||||
|
subs[topic] = byte(qos)
|
||||||
|
}
|
||||||
|
token := c.client.SubscribeMultiple(subs, nil)
|
||||||
|
err := tokenWithContext(ctx, token)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unsubscribe unsubscribes from a certain topic and errors if this fails.
|
||||||
func (c *Client) Unsubscribe(ctx context.Context, topic string) error {
|
func (c *Client) Unsubscribe(ctx context.Context, topic string) error {
|
||||||
token := c.client.Unsubscribe(topic)
|
token := c.client.Unsubscribe(topic)
|
||||||
err := tokenWithContext(ctx, token)
|
err := tokenWithContext(ctx, token)
|
||||||
|
@@ -17,7 +17,7 @@ func ctx() context.Context {
|
|||||||
func TestSubcribeSuccess(t *testing.T) {
|
func TestSubcribeSuccess(t *testing.T) {
|
||||||
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
||||||
Servers: []string{
|
Servers: []string{
|
||||||
"tcp://test.mosquitto.org:1883",
|
broker,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -29,10 +29,10 @@ func TestSubcribeSuccess(t *testing.T) {
|
|||||||
t.Fatalf("connect should not have failed: %v", err)
|
t.Fatalf("connect should not have failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
reciever := make(chan mqtt.Message)
|
receiver := make(chan mqtt.Message)
|
||||||
err = client.Subscribe(ctx(), testUUID+"/TestSubcribeSuccess/#", mqtt.ExactlyOnce)
|
err = client.Subscribe(ctx(), testUUID+"/TestSubcribeSuccess/#", mqtt.ExactlyOnce)
|
||||||
client.Handle(testUUID+"/TestSubcribeSuccess/#", func(message mqtt.Message) {
|
client.Handle(testUUID+"/TestSubcribeSuccess/#", func(message mqtt.Message) {
|
||||||
reciever <- message
|
receiver <- message
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("subscribe should not have failed: %v", err)
|
t.Fatalf("subscribe should not have failed: %v", err)
|
||||||
@@ -41,7 +41,7 @@ func TestSubcribeSuccess(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("publish should not have failed: %v", err)
|
t.Fatalf("publish should not have failed: %v", err)
|
||||||
}
|
}
|
||||||
message := <-reciever
|
message := <-receiver
|
||||||
if string(message.Payload()) != "[1, 2]" {
|
if string(message.Payload()) != "[1, 2]" {
|
||||||
t.Fatalf("message payload should have been byte array '%v' but is %v", []byte("[1, 2]"), message.Payload())
|
t.Fatalf("message payload should have been byte array '%v' but is %v", []byte("[1, 2]"), message.Payload())
|
||||||
}
|
}
|
||||||
@@ -73,11 +73,11 @@ func TestSubcribeSuccess(t *testing.T) {
|
|||||||
message.Acknowledge()
|
message.Acknowledge()
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestListenSuccess checks that a listener recieves a message correctly
|
// TestSubcribeMultipleSuccess checks that a message gets recieved correctly
|
||||||
func TestListenSuccess(t *testing.T) {
|
func TestSubcribeMultipleSuccess(t *testing.T) {
|
||||||
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
||||||
Servers: []string{
|
Servers: []string{
|
||||||
"tcp://test.mosquitto.org:1883",
|
broker,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -88,7 +88,67 @@ func TestListenSuccess(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("connect should not have failed: %v", err)
|
t.Fatalf("connect should not have failed: %v", err)
|
||||||
}
|
}
|
||||||
reciever, _ := client.Listen(testUUID + "/TestListenSuccess")
|
|
||||||
|
receiver := make(chan mqtt.Message)
|
||||||
|
err = client.SubscribeMultiple(ctx(), map[string]mqtt.QOS{testUUID + "/TestSubcribeMultipleSuccess/#": mqtt.ExactlyOnce})
|
||||||
|
client.Handle(testUUID+"/TestSubcribeMultipleSuccess/#", func(message mqtt.Message) {
|
||||||
|
receiver <- message
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("subscribe should not have failed: %v", err)
|
||||||
|
}
|
||||||
|
err = client.PublishString(ctx(), testUUID+"/TestSubcribeMultipleSuccess/abc", "[1, 2]", mqtt.ExactlyOnce)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("publish should not have failed: %v", err)
|
||||||
|
}
|
||||||
|
message := <-receiver
|
||||||
|
if string(message.Payload()) != "[1, 2]" {
|
||||||
|
t.Fatalf("message payload should have been byte array '%v' but is %v", []byte("[1, 2]"), message.Payload())
|
||||||
|
}
|
||||||
|
if message.PayloadString() != "[1, 2]" {
|
||||||
|
t.Fatalf("message payload should have been '[1, 2]' but is %v", message.PayloadString())
|
||||||
|
}
|
||||||
|
v := []int{}
|
||||||
|
err = message.PayloadJSON(&v)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("json should have unmarshalled: %v", err)
|
||||||
|
}
|
||||||
|
if len(v) != 2 || v[0] != 1 || v[1] != 2 {
|
||||||
|
t.Fatalf("message payload should have been []int{1, 2} but is %v", v)
|
||||||
|
}
|
||||||
|
if message.Topic() != testUUID+"/TestSubcribeMultipleSuccess/abc" {
|
||||||
|
t.Fatalf("message topic should be %v but is %v", testUUID+"/TestSubcribeMultipleSuccess/abc", message.Topic())
|
||||||
|
}
|
||||||
|
if message.QOS() != mqtt.ExactlyOnce {
|
||||||
|
t.Fatalf("message qos should be mqtt.ExactlyOnce but is %v", message.QOS())
|
||||||
|
}
|
||||||
|
if message.IsDuplicate() != false {
|
||||||
|
t.Fatalf("message IsDuplicate should be false but is %v", message.IsDuplicate())
|
||||||
|
}
|
||||||
|
vars := message.TopicVars()
|
||||||
|
if len(vars) != 1 && vars[0] != "abc" {
|
||||||
|
t.Fatalf("message TopicVars should be ['abc'] but is %v", vars)
|
||||||
|
}
|
||||||
|
|
||||||
|
message.Acknowledge()
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestListenSuccess checks that a listener recieves a message correctly
|
||||||
|
func TestListenSuccess(t *testing.T) {
|
||||||
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
||||||
|
Servers: []string{
|
||||||
|
broker,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("creating client should not have failed: %v", err)
|
||||||
|
}
|
||||||
|
err = client.Connect(ctx())
|
||||||
|
defer client.DisconnectImmediately()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("connect should not have failed: %v", err)
|
||||||
|
}
|
||||||
|
receiver, _ := client.Listen(testUUID + "/TestListenSuccess")
|
||||||
err = client.Subscribe(ctx(), testUUID+"/TestListenSuccess", mqtt.ExactlyOnce)
|
err = client.Subscribe(ctx(), testUUID+"/TestListenSuccess", mqtt.ExactlyOnce)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("subscribe should not have failed: %v", err)
|
t.Fatalf("subscribe should not have failed: %v", err)
|
||||||
@@ -97,7 +157,7 @@ func TestListenSuccess(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("publish should not have failed: %v", err)
|
t.Fatalf("publish should not have failed: %v", err)
|
||||||
}
|
}
|
||||||
message := <-reciever
|
message := <-receiver
|
||||||
if message.PayloadString() != "hello" {
|
if message.PayloadString() != "hello" {
|
||||||
t.Fatalf("message payload should have been 'hello' but is %v", message)
|
t.Fatalf("message payload should have been 'hello' but is %v", message)
|
||||||
}
|
}
|
||||||
@@ -107,7 +167,7 @@ func TestListenSuccess(t *testing.T) {
|
|||||||
func TestSubcribeFailure(t *testing.T) {
|
func TestSubcribeFailure(t *testing.T) {
|
||||||
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
||||||
Servers: []string{
|
Servers: []string{
|
||||||
"tcp://test.mosquitto.org:1883",
|
broker,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -128,7 +188,7 @@ func TestSubcribeFailure(t *testing.T) {
|
|||||||
func TestSubcribeSuccessAdvancedRouting(t *testing.T) {
|
func TestSubcribeSuccessAdvancedRouting(t *testing.T) {
|
||||||
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
||||||
Servers: []string{
|
Servers: []string{
|
||||||
"tcp://test.mosquitto.org:1883",
|
broker,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -139,9 +199,9 @@ func TestSubcribeSuccessAdvancedRouting(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("connect should not have failed: %v", err)
|
t.Fatalf("connect should not have failed: %v", err)
|
||||||
}
|
}
|
||||||
reciever := make(chan mqtt.Message)
|
receiver := make(chan mqtt.Message)
|
||||||
client.Handle(testUUID+"/TestSubcribeSuccessAdvancedRouting/#", func(message mqtt.Message) {
|
client.Handle(testUUID+"/TestSubcribeSuccessAdvancedRouting/#", func(message mqtt.Message) {
|
||||||
reciever <- message
|
receiver <- message
|
||||||
})
|
})
|
||||||
err = client.Subscribe(ctx(), testUUID+"/TestSubcribeSuccessAdvancedRouting/#", mqtt.ExactlyOnce)
|
err = client.Subscribe(ctx(), testUUID+"/TestSubcribeSuccessAdvancedRouting/#", mqtt.ExactlyOnce)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -151,7 +211,7 @@ func TestSubcribeSuccessAdvancedRouting(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("publish should not have failed: %v", err)
|
t.Fatalf("publish should not have failed: %v", err)
|
||||||
}
|
}
|
||||||
message := <-reciever
|
message := <-receiver
|
||||||
if message.PayloadString() != "hello world" {
|
if message.PayloadString() != "hello world" {
|
||||||
t.Fatalf("message payload should have been 'hello world' but is %v", message.PayloadString())
|
t.Fatalf("message payload should have been 'hello world' but is %v", message.PayloadString())
|
||||||
}
|
}
|
||||||
@@ -161,7 +221,7 @@ func TestSubcribeSuccessAdvancedRouting(t *testing.T) {
|
|||||||
func TestSubcribeNoRecieve(t *testing.T) {
|
func TestSubcribeNoRecieve(t *testing.T) {
|
||||||
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
||||||
Servers: []string{
|
Servers: []string{
|
||||||
"tcp://test.mosquitto.org:1883",
|
broker,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -190,7 +250,7 @@ func TestSubcribeNoRecieve(t *testing.T) {
|
|||||||
func TestUnsubcribe(t *testing.T) {
|
func TestUnsubcribe(t *testing.T) {
|
||||||
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
||||||
Servers: []string{
|
Servers: []string{
|
||||||
"tcp://test.mosquitto.org:1883",
|
broker,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -223,7 +283,7 @@ func TestUnsubcribe(t *testing.T) {
|
|||||||
func TestRemoveRoute(t *testing.T) {
|
func TestRemoveRoute(t *testing.T) {
|
||||||
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
||||||
Servers: []string{
|
Servers: []string{
|
||||||
"tcp://test.mosquitto.org:1883",
|
broker,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -234,7 +294,7 @@ func TestRemoveRoute(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("connect should not have failed: %v", err)
|
t.Fatalf("connect should not have failed: %v", err)
|
||||||
}
|
}
|
||||||
reciever, route := client.Listen(testUUID + "/TestRemoveRoute")
|
receiver, route := client.Listen(testUUID + "/TestRemoveRoute")
|
||||||
err = client.Subscribe(ctx(), testUUID+"/TestRemoveRoute", mqtt.ExactlyOnce)
|
err = client.Subscribe(ctx(), testUUID+"/TestRemoveRoute", mqtt.ExactlyOnce)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("subscribe should not have failed: %v", err)
|
t.Fatalf("subscribe should not have failed: %v", err)
|
||||||
@@ -243,10 +303,10 @@ func TestRemoveRoute(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("publish should not have failed: %v", err)
|
t.Fatalf("publish should not have failed: %v", err)
|
||||||
}
|
}
|
||||||
<-reciever
|
<-receiver
|
||||||
route.Stop()
|
route.Stop()
|
||||||
select {
|
select {
|
||||||
case <-reciever:
|
case <-receiver:
|
||||||
t.Fatalf("recieved a message which was not meant to happen: %v", err)
|
t.Fatalf("recieved a message which was not meant to happen: %v", err)
|
||||||
case <-time.After(500 * time.Millisecond):
|
case <-time.After(500 * time.Millisecond):
|
||||||
}
|
}
|
||||||
@@ -256,7 +316,7 @@ func TestRemoveRoute(t *testing.T) {
|
|||||||
func TestEmptyRoute(t *testing.T) {
|
func TestEmptyRoute(t *testing.T) {
|
||||||
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
||||||
Servers: []string{
|
Servers: []string{
|
||||||
"tcp://test.mosquitto.org:1883",
|
broker,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Reference in New Issue
Block a user