Subscribe to multiple topics at once (#2)

* Subscribe multiple

* Fixed CI

* Test coverage

* Fixed CI ports

* Added SubscribeMultiple to readme
This commit is contained in:
Luca Casonato
2020-02-26 15:50:48 +01:00
committed by GitHub
parent f2aa0ffd4d
commit a817bc8f8a
7 changed files with 142 additions and 37 deletions

View File

@@ -4,8 +4,15 @@ on: [push]
jobs:
test:
name: ci
name: test
runs-on: ubuntu-latest
services:
mqtt:
image: eclipse-mosquitto:latest
ports:
- '1883:1883'
env:
MQTT_BROKER: tcp://localhost:1883
steps:
- name: Set up Go 1.13
uses: actions/setup-go@v1
@@ -18,8 +25,15 @@ jobs:
- name: Run tests
run: go test -race
coverage:
name: Coverage
name: coverage
runs-on: ubuntu-latest
services:
mqtt:
image: eclipse-mosquitto:latest
ports:
- '1883:1883'
env:
MQTT_BROKER: tcp://localhost:1883
steps:
- name: Set up Go 1.13
uses: actions/setup-go@v1

View File

@@ -94,6 +94,15 @@ if err != nil {
}
```
```go
err := client.SubscribeMultiple(context.WithTimeout(1 * time.Second), map[string]mqtt.QOS{
"api/v0/main/client1": mqtt.AtLeastOnce,
})
if err != nil {
panic(err)
}
```
### handling
```go

3
go.sum
View File

@@ -2,8 +2,11 @@ github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t
github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20191014212845-da9a3fd4c582 h1:p9xBe/w/OzkeYVKm234g55gMdD1nSIooTir5kV11kfA=
golang.org/x/net v0.0.0-20191014212845-da9a3fd4c582/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

View File

@@ -37,11 +37,11 @@ func TestNewClientNoServer(t *testing.T) {
func TestNewClientBasicServer(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
broker,
},
})
if err != nil {
t.Fatal("err should be nil")
t.Fatalf("creating client failed: %v", err)
}
if client == nil {
t.Fatal("client should not be nil")
@@ -52,11 +52,11 @@ func TestNewClientBasicServer(t *testing.T) {
func TestNewClientNoClientID(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
broker,
},
})
if err != nil {
t.Fatalf("err should be nil but is %v", err)
t.Fatalf("creating client failed: %v", err)
}
if client == nil {
t.Fatal("client should not be nil")
@@ -70,12 +70,12 @@ func TestNewClientNoClientID(t *testing.T) {
func TestNewClientHasClientID(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
broker,
},
ClientID: "client-id",
})
if err != nil {
t.Fatal("err should be nil")
t.Fatalf("creating client failed: %v", err)
}
if client == nil {
t.Fatal("client should not be nil")
@@ -89,13 +89,13 @@ func TestNewClientHasClientID(t *testing.T) {
func TestNewClientWithAuthentication(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
broker,
},
Username: "user",
Password: "password",
})
if err != nil {
t.Fatal("err should be nil")
t.Fatalf("creating client failed: %v", err)
}
if client == nil {
t.Fatal("client should not be nil")
@@ -106,18 +106,18 @@ func TestNewClientWithAuthentication(t *testing.T) {
func TestConnectSuccess(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
broker,
},
})
if err != nil {
t.Fatal("err should be nil")
t.Fatalf("creating client failed: %v", err)
}
if client == nil {
t.Fatal("client should not be nil")
}
err = client.Connect(ctx())
if err != nil {
t.Fatal("connect should not have failed")
t.Fatalf("connect should not have failed: %v", err)
}
}
@@ -125,11 +125,11 @@ func TestConnectSuccess(t *testing.T) {
func TestConnectContextTimeout(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
broker,
},
})
if err != nil {
t.Fatal("err should be nil")
t.Fatalf("creating client failed: %v", err)
}
if client == nil {
t.Fatal("client should not be nil")
@@ -146,11 +146,11 @@ func TestConnectContextTimeout(t *testing.T) {
func TestConnectContextCancel(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
broker,
},
})
if err != nil {
t.Fatal("err should be nil")
t.Fatalf("creating client failed: %v", err)
}
if client == nil {
t.Fatal("client should not be nil")
@@ -175,7 +175,7 @@ func TestConnectFailed(t *testing.T) {
},
})
if err != nil {
t.Fatal("err should be nil")
t.Fatalf("creating client failed: %v", err)
}
if client == nil {
t.Fatal("client should not be nil")
@@ -190,18 +190,18 @@ func TestConnectFailed(t *testing.T) {
func TestDisconnectImmediately(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
broker,
},
})
if err != nil {
t.Fatal("err should be nil")
t.Fatalf("creating client failed: %v", err)
}
if client == nil {
t.Fatal("client should not be nil")
}
err = client.Connect(ctx())
if err != nil {
t.Fatal("connect should not have failed")
t.Fatalf("connect should not have failed: %v", err)
}
client.DisconnectImmediately()
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"os"
"testing"
"time"
@@ -12,12 +13,19 @@ import (
)
var testUUID = uuid.New().String()
var broker = os.Getenv("MQTT_BROKER")
func init() {
if broker == "" {
broker = "tcp://localhost:1883"
}
}
// TestPublishSuccess checks that a message publish succeeds
func TestPublishSuccess(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
broker,
},
})
if err != nil {
@@ -39,7 +47,7 @@ func TestPublishSuccess(t *testing.T) {
func TestPublishContextTimeout(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
broker,
},
})
defer client.DisconnectImmediately()
@@ -62,7 +70,7 @@ func TestPublishContextTimeout(t *testing.T) {
func TestPublishContextCancelled(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
broker,
},
})
defer client.DisconnectImmediately()
@@ -89,7 +97,7 @@ func TestPublishContextCancelled(t *testing.T) {
func TestPublishFailed(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
broker,
},
})
defer client.DisconnectImmediately()
@@ -110,7 +118,7 @@ func TestPublishFailed(t *testing.T) {
func TestPublishSuccessRetained(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
broker,
},
})
if err != nil {
@@ -132,7 +140,7 @@ func TestPublishSuccessRetained(t *testing.T) {
func TestPublisStringSuccess(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
broker,
},
})
if err != nil {
@@ -154,7 +162,7 @@ func TestPublisStringSuccess(t *testing.T) {
func TestPublisJSONSuccess(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
broker,
},
})
if err != nil {
@@ -176,7 +184,7 @@ func TestPublisJSONSuccess(t *testing.T) {
func TestPublisJSONFailed(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
broker,
},
})
if err != nil {

View File

@@ -80,6 +80,17 @@ func (c *Client) Subscribe(ctx context.Context, topic string, qos QOS) error {
return err
}
// SubscribeMultiple subscribes to multiple topics and errors if this fails.
func (c *Client) SubscribeMultiple(ctx context.Context, subscriptions map[string]QOS) error {
subs := make(map[string]byte, len(subscriptions))
for topic, qos := range subscriptions {
subs[topic] = byte(qos)
}
token := c.client.SubscribeMultiple(subs, nil)
err := tokenWithContext(ctx, token)
return err
}
// Unsubscribe unsubscribes from a certain topic and errors if this fails.
func (c *Client) Unsubscribe(ctx context.Context, topic string) error {
token := c.client.Unsubscribe(topic)

View File

@@ -17,7 +17,7 @@ func ctx() context.Context {
func TestSubcribeSuccess(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
broker,
},
})
if err != nil {
@@ -73,11 +73,71 @@ func TestSubcribeSuccess(t *testing.T) {
message.Acknowledge()
}
// TestSubcribeMultipleSuccess checks that a message gets recieved correctly
func TestSubcribeMultipleSuccess(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
broker,
},
})
if err != nil {
t.Fatalf("creating client should not have failed: %v", err)
}
err = client.Connect(ctx())
defer client.DisconnectImmediately()
if err != nil {
t.Fatalf("connect should not have failed: %v", err)
}
receiver := make(chan mqtt.Message)
err = client.SubscribeMultiple(ctx(), map[string]mqtt.QOS{testUUID + "/TestSubcribeMultipleSuccess/#": mqtt.ExactlyOnce})
client.Handle(testUUID+"/TestSubcribeMultipleSuccess/#", func(message mqtt.Message) {
receiver <- message
})
if err != nil {
t.Fatalf("subscribe should not have failed: %v", err)
}
err = client.PublishString(ctx(), testUUID+"/TestSubcribeMultipleSuccess/abc", "[1, 2]", mqtt.ExactlyOnce)
if err != nil {
t.Fatalf("publish should not have failed: %v", err)
}
message := <-receiver
if string(message.Payload()) != "[1, 2]" {
t.Fatalf("message payload should have been byte array '%v' but is %v", []byte("[1, 2]"), message.Payload())
}
if message.PayloadString() != "[1, 2]" {
t.Fatalf("message payload should have been '[1, 2]' but is %v", message.PayloadString())
}
v := []int{}
err = message.PayloadJSON(&v)
if err != nil {
t.Fatalf("json should have unmarshalled: %v", err)
}
if len(v) != 2 || v[0] != 1 || v[1] != 2 {
t.Fatalf("message payload should have been []int{1, 2} but is %v", v)
}
if message.Topic() != testUUID+"/TestSubcribeMultipleSuccess/abc" {
t.Fatalf("message topic should be %v but is %v", testUUID+"/TestSubcribeMultipleSuccess/abc", message.Topic())
}
if message.QOS() != mqtt.ExactlyOnce {
t.Fatalf("message qos should be mqtt.ExactlyOnce but is %v", message.QOS())
}
if message.IsDuplicate() != false {
t.Fatalf("message IsDuplicate should be false but is %v", message.IsDuplicate())
}
vars := message.TopicVars()
if len(vars) != 1 && vars[0] != "abc" {
t.Fatalf("message TopicVars should be ['abc'] but is %v", vars)
}
message.Acknowledge()
}
// TestListenSuccess checks that a listener recieves a message correctly
func TestListenSuccess(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
broker,
},
})
if err != nil {
@@ -107,7 +167,7 @@ func TestListenSuccess(t *testing.T) {
func TestSubcribeFailure(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
broker,
},
})
if err != nil {
@@ -128,7 +188,7 @@ func TestSubcribeFailure(t *testing.T) {
func TestSubcribeSuccessAdvancedRouting(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
broker,
},
})
if err != nil {
@@ -161,7 +221,7 @@ func TestSubcribeSuccessAdvancedRouting(t *testing.T) {
func TestSubcribeNoRecieve(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
broker,
},
})
if err != nil {
@@ -190,7 +250,7 @@ func TestSubcribeNoRecieve(t *testing.T) {
func TestUnsubcribe(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
broker,
},
})
if err != nil {
@@ -223,7 +283,7 @@ func TestUnsubcribe(t *testing.T) {
func TestRemoveRoute(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
broker,
},
})
if err != nil {
@@ -256,7 +316,7 @@ func TestRemoveRoute(t *testing.T) {
func TestEmptyRoute(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
broker,
},
})
if err != nil {