mirror of
https://github.com/lucacasonato/mqtt.git
synced 2025-09-26 19:01:12 +08:00

* Subscribe multiple * Fixed CI * Test coverage * Fixed CI ports * Added SubscribeMultiple to readme
327 lines
10 KiB
Go
327 lines
10 KiB
Go
package mqtt_test
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/lucacasonato/mqtt"
|
|
)
|
|
|
|
func ctx() context.Context {
|
|
c, _ := context.WithTimeout(context.Background(), 1*time.Second)
|
|
return c
|
|
}
|
|
|
|
// TestSubcribeSuccess checks that a message gets recieved correctly
|
|
func TestSubcribeSuccess(t *testing.T) {
|
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
|
Servers: []string{
|
|
broker,
|
|
},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("creating client should not have failed: %v", err)
|
|
}
|
|
err = client.Connect(ctx())
|
|
defer client.DisconnectImmediately()
|
|
if err != nil {
|
|
t.Fatalf("connect should not have failed: %v", err)
|
|
}
|
|
|
|
receiver := make(chan mqtt.Message)
|
|
err = client.Subscribe(ctx(), testUUID+"/TestSubcribeSuccess/#", mqtt.ExactlyOnce)
|
|
client.Handle(testUUID+"/TestSubcribeSuccess/#", func(message mqtt.Message) {
|
|
receiver <- message
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("subscribe should not have failed: %v", err)
|
|
}
|
|
err = client.PublishString(ctx(), testUUID+"/TestSubcribeSuccess/abc", "[1, 2]", mqtt.ExactlyOnce)
|
|
if err != nil {
|
|
t.Fatalf("publish should not have failed: %v", err)
|
|
}
|
|
message := <-receiver
|
|
if string(message.Payload()) != "[1, 2]" {
|
|
t.Fatalf("message payload should have been byte array '%v' but is %v", []byte("[1, 2]"), message.Payload())
|
|
}
|
|
if message.PayloadString() != "[1, 2]" {
|
|
t.Fatalf("message payload should have been '[1, 2]' but is %v", message.PayloadString())
|
|
}
|
|
v := []int{}
|
|
err = message.PayloadJSON(&v)
|
|
if err != nil {
|
|
t.Fatalf("json should have unmarshalled: %v", err)
|
|
}
|
|
if len(v) != 2 || v[0] != 1 || v[1] != 2 {
|
|
t.Fatalf("message payload should have been []int{1, 2} but is %v", v)
|
|
}
|
|
if message.Topic() != testUUID+"/TestSubcribeSuccess/abc" {
|
|
t.Fatalf("message topic should be %v but is %v", testUUID+"/TestSubcribeSuccess/abc", message.Topic())
|
|
}
|
|
if message.QOS() != mqtt.ExactlyOnce {
|
|
t.Fatalf("message qos should be mqtt.ExactlyOnce but is %v", message.QOS())
|
|
}
|
|
if message.IsDuplicate() != false {
|
|
t.Fatalf("message IsDuplicate should be false but is %v", message.IsDuplicate())
|
|
}
|
|
vars := message.TopicVars()
|
|
if len(vars) != 1 && vars[0] != "abc" {
|
|
t.Fatalf("message TopicVars should be ['abc'] but is %v", vars)
|
|
}
|
|
|
|
message.Acknowledge()
|
|
}
|
|
|
|
// TestSubcribeMultipleSuccess checks that a message gets recieved correctly
|
|
func TestSubcribeMultipleSuccess(t *testing.T) {
|
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
|
Servers: []string{
|
|
broker,
|
|
},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("creating client should not have failed: %v", err)
|
|
}
|
|
err = client.Connect(ctx())
|
|
defer client.DisconnectImmediately()
|
|
if err != nil {
|
|
t.Fatalf("connect should not have failed: %v", err)
|
|
}
|
|
|
|
receiver := make(chan mqtt.Message)
|
|
err = client.SubscribeMultiple(ctx(), map[string]mqtt.QOS{testUUID + "/TestSubcribeMultipleSuccess/#": mqtt.ExactlyOnce})
|
|
client.Handle(testUUID+"/TestSubcribeMultipleSuccess/#", func(message mqtt.Message) {
|
|
receiver <- message
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("subscribe should not have failed: %v", err)
|
|
}
|
|
err = client.PublishString(ctx(), testUUID+"/TestSubcribeMultipleSuccess/abc", "[1, 2]", mqtt.ExactlyOnce)
|
|
if err != nil {
|
|
t.Fatalf("publish should not have failed: %v", err)
|
|
}
|
|
message := <-receiver
|
|
if string(message.Payload()) != "[1, 2]" {
|
|
t.Fatalf("message payload should have been byte array '%v' but is %v", []byte("[1, 2]"), message.Payload())
|
|
}
|
|
if message.PayloadString() != "[1, 2]" {
|
|
t.Fatalf("message payload should have been '[1, 2]' but is %v", message.PayloadString())
|
|
}
|
|
v := []int{}
|
|
err = message.PayloadJSON(&v)
|
|
if err != nil {
|
|
t.Fatalf("json should have unmarshalled: %v", err)
|
|
}
|
|
if len(v) != 2 || v[0] != 1 || v[1] != 2 {
|
|
t.Fatalf("message payload should have been []int{1, 2} but is %v", v)
|
|
}
|
|
if message.Topic() != testUUID+"/TestSubcribeMultipleSuccess/abc" {
|
|
t.Fatalf("message topic should be %v but is %v", testUUID+"/TestSubcribeMultipleSuccess/abc", message.Topic())
|
|
}
|
|
if message.QOS() != mqtt.ExactlyOnce {
|
|
t.Fatalf("message qos should be mqtt.ExactlyOnce but is %v", message.QOS())
|
|
}
|
|
if message.IsDuplicate() != false {
|
|
t.Fatalf("message IsDuplicate should be false but is %v", message.IsDuplicate())
|
|
}
|
|
vars := message.TopicVars()
|
|
if len(vars) != 1 && vars[0] != "abc" {
|
|
t.Fatalf("message TopicVars should be ['abc'] but is %v", vars)
|
|
}
|
|
|
|
message.Acknowledge()
|
|
}
|
|
|
|
// TestListenSuccess checks that a listener recieves a message correctly
|
|
func TestListenSuccess(t *testing.T) {
|
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
|
Servers: []string{
|
|
broker,
|
|
},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("creating client should not have failed: %v", err)
|
|
}
|
|
err = client.Connect(ctx())
|
|
defer client.DisconnectImmediately()
|
|
if err != nil {
|
|
t.Fatalf("connect should not have failed: %v", err)
|
|
}
|
|
receiver, _ := client.Listen(testUUID + "/TestListenSuccess")
|
|
err = client.Subscribe(ctx(), testUUID+"/TestListenSuccess", mqtt.ExactlyOnce)
|
|
if err != nil {
|
|
t.Fatalf("subscribe should not have failed: %v", err)
|
|
}
|
|
err = client.PublishString(ctx(), testUUID+"/TestListenSuccess", "hello", mqtt.ExactlyOnce)
|
|
if err != nil {
|
|
t.Fatalf("publish should not have failed: %v", err)
|
|
}
|
|
message := <-receiver
|
|
if message.PayloadString() != "hello" {
|
|
t.Fatalf("message payload should have been 'hello' but is %v", message)
|
|
}
|
|
}
|
|
|
|
// TestSubcribeSuccess checks that a message gets recieved correctly
|
|
func TestSubcribeFailure(t *testing.T) {
|
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
|
Servers: []string{
|
|
broker,
|
|
},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("creating client should not have failed: %v", err)
|
|
}
|
|
err = client.Connect(ctx())
|
|
defer client.DisconnectImmediately()
|
|
if err != nil {
|
|
t.Fatalf("connect should not have failed: %v", err)
|
|
}
|
|
err = client.Subscribe(ctx(), testUUID+"/#/test_publish", mqtt.ExactlyOnce) // # in the middle of a subscribe is not allowed
|
|
if err == nil {
|
|
t.Fatalf("subscribe should have failed: %v", err)
|
|
}
|
|
}
|
|
|
|
// TestSubcribeSuccess checks that a message gets recieved correctly
|
|
func TestSubcribeSuccessAdvancedRouting(t *testing.T) {
|
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
|
Servers: []string{
|
|
broker,
|
|
},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("creating client should not have failed: %v", err)
|
|
}
|
|
err = client.Connect(ctx())
|
|
defer client.DisconnectImmediately()
|
|
if err != nil {
|
|
t.Fatalf("connect should not have failed: %v", err)
|
|
}
|
|
receiver := make(chan mqtt.Message)
|
|
client.Handle(testUUID+"/TestSubcribeSuccessAdvancedRouting/#", func(message mqtt.Message) {
|
|
receiver <- message
|
|
})
|
|
err = client.Subscribe(ctx(), testUUID+"/TestSubcribeSuccessAdvancedRouting/#", mqtt.ExactlyOnce)
|
|
if err != nil {
|
|
t.Fatalf("subscribe should not have failed: %v", err)
|
|
}
|
|
err = client.PublishString(ctx(), testUUID+"/TestSubcribeSuccessAdvancedRouting/abc", "hello world", mqtt.ExactlyOnce)
|
|
if err != nil {
|
|
t.Fatalf("publish should not have failed: %v", err)
|
|
}
|
|
message := <-receiver
|
|
if message.PayloadString() != "hello world" {
|
|
t.Fatalf("message payload should have been 'hello world' but is %v", message.PayloadString())
|
|
}
|
|
}
|
|
|
|
// TestSubcribeNoRecieve checks that a message does not get recieved when it is not listening
|
|
func TestSubcribeNoRecieve(t *testing.T) {
|
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
|
Servers: []string{
|
|
broker,
|
|
},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("creating client should not have failed: %v", err)
|
|
}
|
|
err = client.Connect(ctx())
|
|
defer client.DisconnectImmediately()
|
|
if err != nil {
|
|
t.Fatalf("connect should not have failed: %v", err)
|
|
}
|
|
client.Handle(testUUID+"/TestSubcribeNoRecieve/abc", func(message mqtt.Message) {
|
|
t.Fatalf("recieved a message which was not meant to happen: %v", err)
|
|
})
|
|
err = client.Subscribe(ctx(), testUUID+"/TestSubcribeNoRecieve/def", mqtt.ExactlyOnce)
|
|
if err != nil {
|
|
t.Fatalf("subscribe should not have failed: %v", err)
|
|
}
|
|
err = client.PublishString(ctx(), testUUID+"/TestSubcribeNoRecieve/def", "hello world", mqtt.ExactlyOnce)
|
|
if err != nil {
|
|
t.Fatalf("publish should not have failed: %v", err)
|
|
}
|
|
<-time.After(500 * time.Millisecond)
|
|
}
|
|
|
|
// TestUnsubcribe checks that a message does not get recieved after you unsubscribe
|
|
func TestUnsubcribe(t *testing.T) {
|
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
|
Servers: []string{
|
|
broker,
|
|
},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("creating client should not have failed: %v", err)
|
|
}
|
|
err = client.Connect(ctx())
|
|
defer client.DisconnectImmediately()
|
|
if err != nil {
|
|
t.Fatalf("connect should not have failed: %v", err)
|
|
}
|
|
client.Handle(testUUID+"/TestUnsubcribe", func(message mqtt.Message) {
|
|
t.Fatalf("recieved a message which was not meant to happen: %v", err)
|
|
})
|
|
err = client.Subscribe(ctx(), testUUID+"/TestUnsubcribe", mqtt.ExactlyOnce)
|
|
if err != nil {
|
|
t.Fatalf("subscribe should not have failed: %v", err)
|
|
}
|
|
err = client.Unsubscribe(ctx(), testUUID+"/TestUnsubcribe")
|
|
if err != nil {
|
|
t.Fatalf("unsubscribe should not have failed: %v", err)
|
|
}
|
|
err = client.PublishString(ctx(), testUUID+"/TestUnsubcribe", "hello world", mqtt.ExactlyOnce)
|
|
if err != nil {
|
|
t.Fatalf("publish should not have failed: %v", err)
|
|
}
|
|
<-time.After(500 * time.Millisecond)
|
|
}
|
|
|
|
// TestRemoveRoute checks that a route can be unsubscribed from
|
|
func TestRemoveRoute(t *testing.T) {
|
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
|
Servers: []string{
|
|
broker,
|
|
},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("creating client should not have failed: %v", err)
|
|
}
|
|
err = client.Connect(ctx())
|
|
defer client.DisconnectImmediately()
|
|
if err != nil {
|
|
t.Fatalf("connect should not have failed: %v", err)
|
|
}
|
|
receiver, route := client.Listen(testUUID + "/TestRemoveRoute")
|
|
err = client.Subscribe(ctx(), testUUID+"/TestRemoveRoute", mqtt.ExactlyOnce)
|
|
if err != nil {
|
|
t.Fatalf("subscribe should not have failed: %v", err)
|
|
}
|
|
err = client.PublishString(ctx(), testUUID+"/TestRemoveRoute", "hello", mqtt.ExactlyOnce)
|
|
if err != nil {
|
|
t.Fatalf("publish should not have failed: %v", err)
|
|
}
|
|
<-receiver
|
|
route.Stop()
|
|
select {
|
|
case <-receiver:
|
|
t.Fatalf("recieved a message which was not meant to happen: %v", err)
|
|
case <-time.After(500 * time.Millisecond):
|
|
}
|
|
}
|
|
|
|
// TestEmptyRoute checks that an empty route does nothing
|
|
func TestEmptyRoute(t *testing.T) {
|
|
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
|
Servers: []string{
|
|
broker,
|
|
},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("creating client should not have failed: %v", err)
|
|
}
|
|
client.Handle(testUUID+"/TestEmptyRoute/abc", nil)
|
|
}
|