From a817bc8f8a61a6441e771d7032f3a1b4cd1a6081 Mon Sep 17 00:00:00 2001 From: Luca Casonato Date: Wed, 26 Feb 2020 15:50:48 +0100 Subject: [PATCH] Subscribe to multiple topics at once (#2) * Subscribe multiple * Fixed CI * Test coverage * Fixed CI ports * Added SubscribeMultiple to readme --- .github/workflows/ci.yml | 18 ++++++++-- README.md | 9 +++++ go.sum | 3 ++ mqtt_test.go | 38 ++++++++++---------- publish_test.go | 24 ++++++++----- subscribe.go | 11 ++++++ subscribe_test.go | 76 +++++++++++++++++++++++++++++++++++----- 7 files changed, 142 insertions(+), 37 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 60f379f..8cc07c0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -4,8 +4,15 @@ on: [push] jobs: test: - name: ci + name: test runs-on: ubuntu-latest + services: + mqtt: + image: eclipse-mosquitto:latest + ports: + - '1883:1883' + env: + MQTT_BROKER: tcp://localhost:1883 steps: - name: Set up Go 1.13 uses: actions/setup-go@v1 @@ -18,8 +25,15 @@ jobs: - name: Run tests run: go test -race coverage: - name: Coverage + name: coverage runs-on: ubuntu-latest + services: + mqtt: + image: eclipse-mosquitto:latest + ports: + - '1883:1883' + env: + MQTT_BROKER: tcp://localhost:1883 steps: - name: Set up Go 1.13 uses: actions/setup-go@v1 diff --git a/README.md b/README.md index 6e55909..e24c2a4 100644 --- a/README.md +++ b/README.md @@ -94,6 +94,15 @@ if err != nil { } ``` +```go +err := client.SubscribeMultiple(context.WithTimeout(1 * time.Second), map[string]mqtt.QOS{ + "api/v0/main/client1": mqtt.AtLeastOnce, +}) +if err != nil { + panic(err) +} +``` + ### handling ```go diff --git a/go.sum b/go.sum index a517b93..e2d4860 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,11 @@ github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/net v0.0.0-20191014212845-da9a3fd4c582 h1:p9xBe/w/OzkeYVKm234g55gMdD1nSIooTir5kV11kfA= golang.org/x/net v0.0.0-20191014212845-da9a3fd4c582/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/mqtt_test.go b/mqtt_test.go index b20e768..5c936c5 100644 --- a/mqtt_test.go +++ b/mqtt_test.go @@ -37,11 +37,11 @@ func TestNewClientNoServer(t *testing.T) { func TestNewClientBasicServer(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ - "tcp://test.mosquitto.org:1883", + broker, }, }) if err != nil { - t.Fatal("err should be nil") + t.Fatalf("creating client failed: %v", err) } if client == nil { t.Fatal("client should not be nil") @@ -52,11 +52,11 @@ func TestNewClientBasicServer(t *testing.T) { func TestNewClientNoClientID(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ - "tcp://test.mosquitto.org:1883", + broker, }, }) if err != nil { - t.Fatalf("err should be nil but is %v", err) + t.Fatalf("creating client failed: %v", err) } if client == nil { t.Fatal("client should not be nil") @@ -70,12 +70,12 @@ func TestNewClientNoClientID(t *testing.T) { func TestNewClientHasClientID(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ - "tcp://test.mosquitto.org:1883", + broker, }, ClientID: "client-id", }) if err != nil { - t.Fatal("err should be nil") + t.Fatalf("creating client failed: %v", err) } if client == nil { t.Fatal("client should not be nil") @@ -89,13 +89,13 @@ func TestNewClientHasClientID(t *testing.T) { func TestNewClientWithAuthentication(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ - "tcp://test.mosquitto.org:1883", + broker, }, Username: "user", Password: "password", }) if err != nil { - t.Fatal("err should be nil") + t.Fatalf("creating client failed: %v", err) } if client == nil { t.Fatal("client should not be nil") @@ -106,18 +106,18 @@ func TestNewClientWithAuthentication(t *testing.T) { func TestConnectSuccess(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ - "tcp://test.mosquitto.org:1883", + broker, }, }) if err != nil { - t.Fatal("err should be nil") + t.Fatalf("creating client failed: %v", err) } if client == nil { t.Fatal("client should not be nil") } err = client.Connect(ctx()) if err != nil { - t.Fatal("connect should not have failed") + t.Fatalf("connect should not have failed: %v", err) } } @@ -125,11 +125,11 @@ func TestConnectSuccess(t *testing.T) { func TestConnectContextTimeout(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ - "tcp://test.mosquitto.org:1883", + broker, }, }) if err != nil { - t.Fatal("err should be nil") + t.Fatalf("creating client failed: %v", err) } if client == nil { t.Fatal("client should not be nil") @@ -146,11 +146,11 @@ func TestConnectContextTimeout(t *testing.T) { func TestConnectContextCancel(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ - "tcp://test.mosquitto.org:1883", + broker, }, }) if err != nil { - t.Fatal("err should be nil") + t.Fatalf("creating client failed: %v", err) } if client == nil { t.Fatal("client should not be nil") @@ -175,7 +175,7 @@ func TestConnectFailed(t *testing.T) { }, }) if err != nil { - t.Fatal("err should be nil") + t.Fatalf("creating client failed: %v", err) } if client == nil { t.Fatal("client should not be nil") @@ -190,18 +190,18 @@ func TestConnectFailed(t *testing.T) { func TestDisconnectImmediately(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ - "tcp://test.mosquitto.org:1883", + broker, }, }) if err != nil { - t.Fatal("err should be nil") + t.Fatalf("creating client failed: %v", err) } if client == nil { t.Fatal("client should not be nil") } err = client.Connect(ctx()) if err != nil { - t.Fatal("connect should not have failed") + t.Fatalf("connect should not have failed: %v", err) } client.DisconnectImmediately() } diff --git a/publish_test.go b/publish_test.go index 3ecd8e1..47c7e8c 100644 --- a/publish_test.go +++ b/publish_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "os" "testing" "time" @@ -12,12 +13,19 @@ import ( ) var testUUID = uuid.New().String() +var broker = os.Getenv("MQTT_BROKER") + +func init() { + if broker == "" { + broker = "tcp://localhost:1883" + } +} // TestPublishSuccess checks that a message publish succeeds func TestPublishSuccess(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ - "tcp://test.mosquitto.org:1883", + broker, }, }) if err != nil { @@ -39,7 +47,7 @@ func TestPublishSuccess(t *testing.T) { func TestPublishContextTimeout(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ - "tcp://test.mosquitto.org:1883", + broker, }, }) defer client.DisconnectImmediately() @@ -62,7 +70,7 @@ func TestPublishContextTimeout(t *testing.T) { func TestPublishContextCancelled(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ - "tcp://test.mosquitto.org:1883", + broker, }, }) defer client.DisconnectImmediately() @@ -89,7 +97,7 @@ func TestPublishContextCancelled(t *testing.T) { func TestPublishFailed(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ - "tcp://test.mosquitto.org:1883", + broker, }, }) defer client.DisconnectImmediately() @@ -110,7 +118,7 @@ func TestPublishFailed(t *testing.T) { func TestPublishSuccessRetained(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ - "tcp://test.mosquitto.org:1883", + broker, }, }) if err != nil { @@ -132,7 +140,7 @@ func TestPublishSuccessRetained(t *testing.T) { func TestPublisStringSuccess(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ - "tcp://test.mosquitto.org:1883", + broker, }, }) if err != nil { @@ -154,7 +162,7 @@ func TestPublisStringSuccess(t *testing.T) { func TestPublisJSONSuccess(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ - "tcp://test.mosquitto.org:1883", + broker, }, }) if err != nil { @@ -176,7 +184,7 @@ func TestPublisJSONSuccess(t *testing.T) { func TestPublisJSONFailed(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ - "tcp://test.mosquitto.org:1883", + broker, }, }) if err != nil { diff --git a/subscribe.go b/subscribe.go index 6ab24f9..bb8a3e4 100644 --- a/subscribe.go +++ b/subscribe.go @@ -80,6 +80,17 @@ func (c *Client) Subscribe(ctx context.Context, topic string, qos QOS) error { return err } +// SubscribeMultiple subscribes to multiple topics and errors if this fails. +func (c *Client) SubscribeMultiple(ctx context.Context, subscriptions map[string]QOS) error { + subs := make(map[string]byte, len(subscriptions)) + for topic, qos := range subscriptions { + subs[topic] = byte(qos) + } + token := c.client.SubscribeMultiple(subs, nil) + err := tokenWithContext(ctx, token) + return err +} + // Unsubscribe unsubscribes from a certain topic and errors if this fails. func (c *Client) Unsubscribe(ctx context.Context, topic string) error { token := c.client.Unsubscribe(topic) diff --git a/subscribe_test.go b/subscribe_test.go index 3c1f9b5..3eb85ab 100644 --- a/subscribe_test.go +++ b/subscribe_test.go @@ -17,7 +17,7 @@ func ctx() context.Context { func TestSubcribeSuccess(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ - "tcp://test.mosquitto.org:1883", + broker, }, }) if err != nil { @@ -73,11 +73,71 @@ func TestSubcribeSuccess(t *testing.T) { message.Acknowledge() } +// TestSubcribeMultipleSuccess checks that a message gets recieved correctly +func TestSubcribeMultipleSuccess(t *testing.T) { + client, err := mqtt.NewClient(mqtt.ClientOptions{ + Servers: []string{ + broker, + }, + }) + if err != nil { + t.Fatalf("creating client should not have failed: %v", err) + } + err = client.Connect(ctx()) + defer client.DisconnectImmediately() + if err != nil { + t.Fatalf("connect should not have failed: %v", err) + } + + receiver := make(chan mqtt.Message) + err = client.SubscribeMultiple(ctx(), map[string]mqtt.QOS{testUUID + "/TestSubcribeMultipleSuccess/#": mqtt.ExactlyOnce}) + client.Handle(testUUID+"/TestSubcribeMultipleSuccess/#", func(message mqtt.Message) { + receiver <- message + }) + if err != nil { + t.Fatalf("subscribe should not have failed: %v", err) + } + err = client.PublishString(ctx(), testUUID+"/TestSubcribeMultipleSuccess/abc", "[1, 2]", mqtt.ExactlyOnce) + if err != nil { + t.Fatalf("publish should not have failed: %v", err) + } + message := <-receiver + if string(message.Payload()) != "[1, 2]" { + t.Fatalf("message payload should have been byte array '%v' but is %v", []byte("[1, 2]"), message.Payload()) + } + if message.PayloadString() != "[1, 2]" { + t.Fatalf("message payload should have been '[1, 2]' but is %v", message.PayloadString()) + } + v := []int{} + err = message.PayloadJSON(&v) + if err != nil { + t.Fatalf("json should have unmarshalled: %v", err) + } + if len(v) != 2 || v[0] != 1 || v[1] != 2 { + t.Fatalf("message payload should have been []int{1, 2} but is %v", v) + } + if message.Topic() != testUUID+"/TestSubcribeMultipleSuccess/abc" { + t.Fatalf("message topic should be %v but is %v", testUUID+"/TestSubcribeMultipleSuccess/abc", message.Topic()) + } + if message.QOS() != mqtt.ExactlyOnce { + t.Fatalf("message qos should be mqtt.ExactlyOnce but is %v", message.QOS()) + } + if message.IsDuplicate() != false { + t.Fatalf("message IsDuplicate should be false but is %v", message.IsDuplicate()) + } + vars := message.TopicVars() + if len(vars) != 1 && vars[0] != "abc" { + t.Fatalf("message TopicVars should be ['abc'] but is %v", vars) + } + + message.Acknowledge() +} + // TestListenSuccess checks that a listener recieves a message correctly func TestListenSuccess(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ - "tcp://test.mosquitto.org:1883", + broker, }, }) if err != nil { @@ -107,7 +167,7 @@ func TestListenSuccess(t *testing.T) { func TestSubcribeFailure(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ - "tcp://test.mosquitto.org:1883", + broker, }, }) if err != nil { @@ -128,7 +188,7 @@ func TestSubcribeFailure(t *testing.T) { func TestSubcribeSuccessAdvancedRouting(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ - "tcp://test.mosquitto.org:1883", + broker, }, }) if err != nil { @@ -161,7 +221,7 @@ func TestSubcribeSuccessAdvancedRouting(t *testing.T) { func TestSubcribeNoRecieve(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ - "tcp://test.mosquitto.org:1883", + broker, }, }) if err != nil { @@ -190,7 +250,7 @@ func TestSubcribeNoRecieve(t *testing.T) { func TestUnsubcribe(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ - "tcp://test.mosquitto.org:1883", + broker, }, }) if err != nil { @@ -223,7 +283,7 @@ func TestUnsubcribe(t *testing.T) { func TestRemoveRoute(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ - "tcp://test.mosquitto.org:1883", + broker, }, }) if err != nil { @@ -256,7 +316,7 @@ func TestRemoveRoute(t *testing.T) { func TestEmptyRoute(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ - "tcp://test.mosquitto.org:1883", + broker, }, }) if err != nil {