diff --git a/README.md b/README.md index 1cbb255..8214e11 100644 --- a/README.md +++ b/README.md @@ -85,35 +85,35 @@ if err != nil { ### subscribing ```go -err := client.Subscribe(context.WithTimeout(1 * time.Second), func(message mqtt.Message) { - fmt.Printf("recieved a message with content %v\n", message.PayloadString()) -}, "api/v0/main/client1", mqtt.AtLeastOnce) +err := client.Subscribe(context.WithTimeout(1 * time.Second), "api/v0/main/client1", mqtt.AtLeastOnce) if err != nil { panic(err) } ``` -### subscribing without listening +### handling ```go -err := client.Subscribe(context.WithTimeout(1 * time.Second), nil, "api/v0/main/client1", mqtt.AtLeastOnce) -if err != nil { - panic(err) -} -``` - -### listening without subscribing - -```go -err := client.Listen(func(message mqtt.Message) { +route := client.Handle("api/v0/main/client1", func(message mqtt.Message) { v := interface{}{} err := message.PayloadJSON(&v) if err != nil { panic(err) } fmt.Printf("recieved a message with content %v\n", v) -}, "api/v0/main/client1") -if err != nil { - panic(err) -} +}) +// once you are done with the route you can stop handling it +route.Stop() +``` + +### listening + +```go +messages, route := client.Listen("api/v0/main/client1") +for { + message := <-messages + fmt.Printf("recieved a message with content %v\n", message.PayloadString()) +} +// once you are done with the route you can stop handling it +route.Stop() ``` diff --git a/example/main.go b/example/main.go new file mode 100644 index 0000000..84fcf7b --- /dev/null +++ b/example/main.go @@ -0,0 +1,67 @@ +package main + +import ( + "context" + "log" + "math/rand" + "time" + + "github.com/google/uuid" + "github.com/lucacasonato/mqtt" +) + +func ctx() context.Context { + cntx, _ := context.WithTimeout(context.Background(), 1*time.Second) + return cntx +} + +type Color struct { + Red uint8 `json:"red"` + Green uint8 `json:"green"` + Blue uint8 `json:"blue"` +} + +func main() { + client, err := mqtt.NewClient(mqtt.ClientOptions{ + Servers: []string{"tcp://localhost:1883"}, + }) + if err != nil { + log.Fatalf("failed to create mqtt client: %v\n", err) + } + + err = client.Connect(ctx()) + if err != nil { + log.Fatalf("failed to connect to mqtt server: %v\n", err) + } + + err = client.Subscribe(ctx(), "my-home-automation/lamps/#", mqtt.AtMostOnce) + if err != nil { + log.Fatalf("failed to subscribe to config service: %v\n", err) + } + + client.Handle("my-home-automation/lamps/+/color", func(m mqtt.Message) { + lampID := m.TopicVars()[0] + var color Color + err := m.PayloadJSON(&color) + if err != nil { + log.Printf("failed to parse color: %v\n", err) + return + } + log.Printf("lamp %v now has the color r: %v g: %v b: %v\n", lampID, color.Red, color.Blue, color.Green) + }) + + for { + lampID := uuid.New().String() + err := client.PublishJSON(ctx(), "my-home-automation/lamps/"+lampID+"/color", Color{ + Red: uint8(rand.Intn(255)), + Green: uint8(rand.Intn(255)), + Blue: uint8(rand.Intn(255)), + }, mqtt.AtLeastOnce) + if err != nil { + log.Printf("failed to publish: %v\n", err) + continue + } + + <-time.After(1 * time.Second) + } +} diff --git a/mqtt.go b/mqtt.go index f6061b2..ece42ab 100644 --- a/mqtt.go +++ b/mqtt.go @@ -79,7 +79,9 @@ func NewClient(options ClientOptions) (*Client, error) { pahoClient.AddRoute("#", handle(func(message Message) { routes := router.match(&message) for _, route := range routes { - route.handler(message) + m := message + m.vars = route.vars(&message) + route.handler(m) } })) diff --git a/mqtt_test.go b/mqtt_test.go index 43944fe..b20e768 100644 --- a/mqtt_test.go +++ b/mqtt_test.go @@ -115,7 +115,7 @@ func TestConnectSuccess(t *testing.T) { if client == nil { t.Fatal("client should not be nil") } - err = client.Connect(context.Background()) + err = client.Connect(ctx()) if err != nil { t.Fatal("connect should not have failed") } @@ -134,7 +134,7 @@ func TestConnectContextTimeout(t *testing.T) { if client == nil { t.Fatal("client should not be nil") } - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond) + ctx, cancel := context.WithTimeout(ctx(), 1*time.Nanosecond) defer cancel() err = client.Connect(ctx) if !errors.Is(err, context.DeadlineExceeded) { @@ -155,7 +155,7 @@ func TestConnectContextCancel(t *testing.T) { if client == nil { t.Fatal("client should not be nil") } - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx()) go func() { time.Sleep(1 * time.Microsecond) cancel() @@ -180,7 +180,7 @@ func TestConnectFailed(t *testing.T) { if client == nil { t.Fatal("client should not be nil") } - err = client.Connect(context.Background()) + err = client.Connect(ctx()) if err == nil { t.Fatal("connect should have failed") } @@ -199,7 +199,7 @@ func TestDisconnectImmediately(t *testing.T) { if client == nil { t.Fatal("client should not be nil") } - err = client.Connect(context.Background()) + err = client.Connect(ctx()) if err != nil { t.Fatal("connect should not have failed") } diff --git a/publish_test.go b/publish_test.go index ba79a5f..3ecd8e1 100644 --- a/publish_test.go +++ b/publish_test.go @@ -23,13 +23,13 @@ func TestPublishSuccess(t *testing.T) { if err != nil { t.Fatalf("creating client should not have failed: %v", err) } - err = client.Connect(context.Background()) + err = client.Connect(ctx()) defer client.DisconnectImmediately() if err != nil { t.Fatalf("connect should not have failed: %v", err) } - err = client.Publish(context.Background(), testUUID+"/TestPublishSuccess", []byte("hello"), mqtt.AtLeastOnce) + err = client.Publish(ctx(), testUUID+"/TestPublishSuccess", []byte("hello"), mqtt.AtLeastOnce) if err != nil { t.Fatalf("publish should not have failed: %v", err) } @@ -46,11 +46,11 @@ func TestPublishContextTimeout(t *testing.T) { if err != nil { t.Fatalf("creating client should not have failed: %v", err) } - err = client.Connect(context.Background()) + err = client.Connect(ctx()) if err != nil { t.Fatalf("connect should not have failed: %v", err) } - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond) + ctx, cancel := context.WithTimeout(ctx(), 1*time.Nanosecond) defer cancel() err = client.Publish(ctx, testUUID+"/TestPublishContextTimeout", []byte("hello"), mqtt.AtLeastOnce) if !errors.Is(err, context.DeadlineExceeded) { @@ -69,11 +69,11 @@ func TestPublishContextCancelled(t *testing.T) { if err != nil { t.Fatalf("creating client should not have failed: %v", err) } - err = client.Connect(context.Background()) + err = client.Connect(ctx()) if err != nil { t.Fatalf("connect should not have failed: %v", err) } - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx()) go func() { time.Sleep(1 * time.Microsecond) cancel() @@ -96,11 +96,11 @@ func TestPublishFailed(t *testing.T) { if err != nil { t.Fatalf("creating client should not have failed: %v", err) } - err = client.Connect(context.Background()) + err = client.Connect(ctx()) if err != nil { t.Fatalf("connect should not have failed: %v", err) } - err = client.Publish(context.Background(), testUUID+"/TestPublishFailed", nil, 3) + err = client.Publish(ctx(), testUUID+"/TestPublishFailed", nil, 3) if err == nil { t.Fatalf("publish should have failed") } @@ -116,13 +116,13 @@ func TestPublishSuccessRetained(t *testing.T) { if err != nil { t.Fatalf("creating client should not have failed: %v", err) } - err = client.Connect(context.Background()) + err = client.Connect(ctx()) defer client.DisconnectImmediately() if err != nil { t.Fatalf("connect should not have failed: %v", err) } - err = client.Publish(context.Background(), testUUID+"/TestPublishSuccessRetained", []byte("hello"), mqtt.AtLeastOnce, mqtt.Retain) + err = client.Publish(ctx(), testUUID+"/TestPublishSuccessRetained", []byte("hello"), mqtt.AtLeastOnce, mqtt.Retain) if err != nil { t.Fatalf("publish should not have failed: %v", err) } @@ -138,13 +138,13 @@ func TestPublisStringSuccess(t *testing.T) { if err != nil { t.Fatalf("creating client should not have failed: %v", err) } - err = client.Connect(context.Background()) + err = client.Connect(ctx()) defer client.DisconnectImmediately() if err != nil { t.Fatalf("connect should not have failed: %v", err) } - err = client.PublishString(context.Background(), testUUID+"/TestPublisStringSuccess", "world", mqtt.AtLeastOnce) + err = client.PublishString(ctx(), testUUID+"/TestPublisStringSuccess", "world", mqtt.AtLeastOnce) if err != nil { t.Fatalf("publish should not have failed: %v", err) } @@ -160,13 +160,13 @@ func TestPublisJSONSuccess(t *testing.T) { if err != nil { t.Fatalf("creating client should not have failed: %v", err) } - err = client.Connect(context.Background()) + err = client.Connect(ctx()) defer client.DisconnectImmediately() if err != nil { t.Fatalf("connect should not have failed: %v", err) } - err = client.PublishJSON(context.Background(), testUUID+"/TestPublisJSONSuccess", []string{"hello", "world"}, mqtt.AtLeastOnce) + err = client.PublishJSON(ctx(), testUUID+"/TestPublisJSONSuccess", []string{"hello", "world"}, mqtt.AtLeastOnce) if err != nil { t.Fatalf("publish should not have failed: %v", err) } @@ -182,13 +182,13 @@ func TestPublisJSONFailed(t *testing.T) { if err != nil { t.Fatalf("creating client should not have failed: %v", err) } - err = client.Connect(context.Background()) + err = client.Connect(ctx()) defer client.DisconnectImmediately() if err != nil { t.Fatalf("connect should not have failed: %v", err) } - err = client.PublishJSON(context.Background(), testUUID+"/TestPublisJSONFailed", make(chan int), mqtt.AtLeastOnce) + err = client.PublishJSON(ctx(), testUUID+"/TestPublisJSONFailed", make(chan int), mqtt.AtLeastOnce) if _, ok := err.(*json.UnsupportedTypeError); !ok { t.Fatalf("publish error should be of type *json.UnsupportedTypeError: %v", err) } diff --git a/router.go b/router.go index 793ba34..00b3f1e 100644 --- a/router.go +++ b/router.go @@ -3,22 +3,30 @@ package mqtt import ( "strings" "sync" + + "github.com/google/uuid" ) type router struct { - routes []route + routes []Route lock sync.RWMutex } func newRouter() *router { - return &router{routes: []route{}, lock: sync.RWMutex{}} + return &router{routes: []Route{}, lock: sync.RWMutex{}} } -type route struct { +type Route struct { + router *router + id string topic string handler MessageHandler } +func newRoute(router *router, topic string, handler MessageHandler) Route { + return Route{router: router, id: uuid.New().String(), topic: topic, handler: handler} +} + func match(route []string, topic []string) bool { if len(route) == 0 { return len(topic) == 0 @@ -52,20 +60,54 @@ func routeSplit(route string) []string { return result } -func (r *route) match(message *Message) bool { +func (r *Route) match(message *Message) bool { return r.topic == message.Topic() || routeIncludesTopic(r.topic, message.Topic()) } -func (r *router) addRoute(topic string, handler MessageHandler) { - if handler != nil { - r.lock.Lock() - r.routes = append(r.routes, route{topic: topic, handler: handler}) - r.lock.Unlock() +func (r *Route) vars(message *Message) []string { + var vars []string + route := routeSplit(r.topic) + topic := strings.Split(message.Topic(), "/") + + for i, section := range route { + if section == "+" { + if len(topic) > i { + vars = append(vars, topic[i]) + } + } else if section == "#" { + if len(topic) > i { + vars = append(vars, topic[i:]...) + } + } } + + return vars } -func (r *router) match(message *Message) []route { - routes := []route{} +func (r *router) addRoute(topic string, handler MessageHandler) Route { + if handler != nil { + route := newRoute(r, topic, handler) + r.lock.Lock() + r.routes = append(r.routes, route) + r.lock.Unlock() + return route + } + return Route{router: r} +} + +func (r *router) removeRoute(removeRoute *Route) { + r.lock.Lock() + for i, route := range r.routes { + if route.id == removeRoute.id { + r.routes[i] = r.routes[len(r.routes)-1] + r.routes = r.routes[:len(r.routes)-1] + } + } + r.lock.Unlock() +} + +func (r *router) match(message *Message) []Route { + routes := []Route{} r.lock.RLock() for _, route := range r.routes { if route.match(message) { @@ -75,3 +117,8 @@ func (r *router) match(message *Message) []route { r.lock.RUnlock() return routes } + +// Stop removes this route from the router and stops matching it +func (r *Route) Stop() { + r.router.removeRoute(r) +} diff --git a/subscribe.go b/subscribe.go index f2b7e71..ba1ff14 100644 --- a/subscribe.go +++ b/subscribe.go @@ -9,10 +9,15 @@ import ( type Message struct { message paho.Message + vars []string } type MessageHandler func(Message) +func (m *Message) TopicVars() []string { + return m.vars +} + func (m *Message) Topic() string { return m.message.Topic() } @@ -41,18 +46,26 @@ func (m *Message) PayloadJSON(v interface{}) error { return json.Unmarshal(m.message.Payload(), v) } -func (c *Client) Listen(handler MessageHandler, topics ...string) { - for _, topic := range topics { - c.router.addRoute(topic, handler) - } +func (c *Client) Handle(topic string, handler MessageHandler) Route { + return c.router.addRoute(topic, handler) } -func (c *Client) Subscribe(ctx context.Context, handler MessageHandler, topic string, qos QOS) error { +func (c *Client) Listen(topic string) (chan Message, Route) { + queue := make(chan Message) + route := c.router.addRoute(topic, func(message Message) { + queue <- message + }) + return queue, route +} + +func (c *Client) Subscribe(ctx context.Context, topic string, qos QOS) error { token := c.client.Subscribe(topic, byte(qos), nil) err := tokenWithContext(ctx, token) - if err != nil { - return err - } - c.router.addRoute(topic, handler) - return nil + return err +} + +func (c *Client) Unsubscribe(ctx context.Context, topic string) error { + token := c.client.Unsubscribe(topic) + err := tokenWithContext(ctx, token) + return err } diff --git a/subscribe_test.go b/subscribe_test.go index 50b936a..3b4b85c 100644 --- a/subscribe_test.go +++ b/subscribe_test.go @@ -8,6 +8,11 @@ import ( "github.com/lucacasonato/mqtt" ) +func ctx() context.Context { + c, _ := context.WithTimeout(context.Background(), 1*time.Second) + return c +} + // TestSubcribeSuccess checks that a message gets recieved correctly func TestSubcribeSuccess(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ @@ -18,20 +23,21 @@ func TestSubcribeSuccess(t *testing.T) { if err != nil { t.Fatalf("creating client should not have failed: %v", err) } - err = client.Connect(context.Background()) + err = client.Connect(ctx()) defer client.DisconnectImmediately() if err != nil { t.Fatalf("connect should not have failed: %v", err) } reciever := make(chan mqtt.Message) - err = client.Subscribe(context.Background(), func(message mqtt.Message) { + err = client.Subscribe(ctx(), testUUID+"/TestSubcribeSuccess/#", mqtt.ExactlyOnce) + client.Handle(testUUID+"/TestSubcribeSuccess/#", func(message mqtt.Message) { reciever <- message - }, testUUID+"/TestSubcribeSuccess", mqtt.ExactlyOnce) + }) if err != nil { t.Fatalf("subscribe should not have failed: %v", err) } - err = client.PublishString(context.Background(), testUUID+"/TestSubcribeSuccess", "[1, 2]", mqtt.ExactlyOnce) + err = client.PublishString(ctx(), testUUID+"/TestSubcribeSuccess/abc", "[1, 2]", mqtt.ExactlyOnce) if err != nil { t.Fatalf("publish should not have failed: %v", err) } @@ -50,8 +56,8 @@ func TestSubcribeSuccess(t *testing.T) { if len(v) != 2 || v[0] != 1 || v[1] != 2 { t.Fatalf("message payload should have been []int{1, 2} but is %v", v) } - if message.Topic() != testUUID+"/TestSubcribeSuccess" { - t.Fatalf("message topic should be %v but is %v", testUUID+"/TestSubcribeSuccess", message.Topic()) + if message.Topic() != testUUID+"/TestSubcribeSuccess/abc" { + t.Fatalf("message topic should be %v but is %v", testUUID+"/TestSubcribeSuccess/abc", message.Topic()) } if message.QOS() != mqtt.ExactlyOnce { t.Fatalf("message qos should be mqtt.ExactlyOnce but is %v", message.QOS()) @@ -59,6 +65,11 @@ func TestSubcribeSuccess(t *testing.T) { if message.IsDuplicate() != false { t.Fatalf("message IsDuplicate should be false but is %v", message.IsDuplicate()) } + vars := message.TopicVars() + if len(vars) != 1 && vars[0] != "abc" { + t.Fatalf("message TopicVars should be ['abc'] but is %v", vars) + } + message.Acknowledge() } @@ -72,20 +83,17 @@ func TestListenSuccess(t *testing.T) { if err != nil { t.Fatalf("creating client should not have failed: %v", err) } - err = client.Connect(context.Background()) + err = client.Connect(ctx()) defer client.DisconnectImmediately() if err != nil { t.Fatalf("connect should not have failed: %v", err) } - reciever := make(chan mqtt.Message) - err = client.Subscribe(context.Background(), func(message mqtt.Message) {}, testUUID+"/TestListenSuccess", mqtt.ExactlyOnce) + reciever, _ := client.Listen(testUUID + "/TestListenSuccess") + err = client.Subscribe(ctx(), testUUID+"/TestListenSuccess", mqtt.ExactlyOnce) if err != nil { t.Fatalf("subscribe should not have failed: %v", err) } - client.Listen(func(message mqtt.Message) { - reciever <- message - }, testUUID+"/TestListenSuccess") - err = client.PublishString(context.Background(), testUUID+"/TestListenSuccess", "hello", mqtt.ExactlyOnce) + err = client.PublishString(ctx(), testUUID+"/TestListenSuccess", "hello", mqtt.ExactlyOnce) if err != nil { t.Fatalf("publish should not have failed: %v", err) } @@ -105,12 +113,12 @@ func TestSubcribeFailure(t *testing.T) { if err != nil { t.Fatalf("creating client should not have failed: %v", err) } - err = client.Connect(context.Background()) + err = client.Connect(ctx()) defer client.DisconnectImmediately() if err != nil { t.Fatalf("connect should not have failed: %v", err) } - err = client.Subscribe(context.Background(), func(message mqtt.Message) {}, testUUID+"/#/test_publish", mqtt.ExactlyOnce) // # in the middle of a subscribe is not allowed + err = client.Subscribe(ctx(), testUUID+"/#/test_publish", mqtt.ExactlyOnce) // # in the middle of a subscribe is not allowed if err == nil { t.Fatalf("subscribe should have failed: %v", err) } @@ -126,19 +134,20 @@ func TestSubcribeSuccessAdvancedRouting(t *testing.T) { if err != nil { t.Fatalf("creating client should not have failed: %v", err) } - err = client.Connect(context.Background()) + err = client.Connect(ctx()) defer client.DisconnectImmediately() if err != nil { t.Fatalf("connect should not have failed: %v", err) } reciever := make(chan mqtt.Message) - err = client.Subscribe(context.Background(), func(message mqtt.Message) { + client.Handle(testUUID+"/TestSubcribeSuccessAdvancedRouting/#", func(message mqtt.Message) { reciever <- message - }, testUUID+"/TestSubcribeSuccessAdvancedRouting/#", mqtt.ExactlyOnce) + }) + err = client.Subscribe(ctx(), testUUID+"/TestSubcribeSuccessAdvancedRouting/#", mqtt.ExactlyOnce) if err != nil { t.Fatalf("subscribe should not have failed: %v", err) } - err = client.PublishString(context.Background(), testUUID+"/TestSubcribeSuccessAdvancedRouting/abc", "hello world", mqtt.ExactlyOnce) + err = client.PublishString(ctx(), testUUID+"/TestSubcribeSuccessAdvancedRouting/abc", "hello world", mqtt.ExactlyOnce) if err != nil { t.Fatalf("publish should not have failed: %v", err) } @@ -148,7 +157,7 @@ func TestSubcribeSuccessAdvancedRouting(t *testing.T) { } } -// TestSubcribeSuccess checks that a message gets recieved correctly +// TestSubcribeNoRecieve checks that a message does not get recieved when it is not listening func TestSubcribeNoRecieve(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ @@ -158,21 +167,100 @@ func TestSubcribeNoRecieve(t *testing.T) { if err != nil { t.Fatalf("creating client should not have failed: %v", err) } - err = client.Connect(context.Background()) + err = client.Connect(ctx()) defer client.DisconnectImmediately() if err != nil { t.Fatalf("connect should not have failed: %v", err) } - client.Listen(func(message mqtt.Message) { + client.Handle(testUUID+"/TestSubcribeNoRecieve/abc", func(message mqtt.Message) { t.Fatalf("recieved a message which was not meant to happen: %v", err) - }, testUUID+"/TestSubcribeSuccessAdvancedRouting/abc") - err = client.Subscribe(context.Background(), nil, testUUID+"/TestSubcribeSuccessAdvancedRouting/def", mqtt.ExactlyOnce) + }) + err = client.Subscribe(ctx(), testUUID+"/TestSubcribeNoRecieve/def", mqtt.ExactlyOnce) if err != nil { t.Fatalf("subscribe should not have failed: %v", err) } - err = client.PublishString(context.Background(), testUUID+"/TestSubcribeSuccessAdvancedRouting/def", "hello world", mqtt.ExactlyOnce) + err = client.PublishString(ctx(), testUUID+"/TestSubcribeNoRecieve/def", "hello world", mqtt.ExactlyOnce) if err != nil { t.Fatalf("publish should not have failed: %v", err) } <-time.After(500 * time.Millisecond) } + +// TestUnsubcribe checks that a message does not get recieved after you unsubscribe +func TestUnsubcribe(t *testing.T) { + client, err := mqtt.NewClient(mqtt.ClientOptions{ + Servers: []string{ + "tcp://test.mosquitto.org:1883", + }, + }) + if err != nil { + t.Fatalf("creating client should not have failed: %v", err) + } + err = client.Connect(ctx()) + defer client.DisconnectImmediately() + if err != nil { + t.Fatalf("connect should not have failed: %v", err) + } + client.Handle(testUUID+"/TestUnsubcribe", func(message mqtt.Message) { + t.Fatalf("recieved a message which was not meant to happen: %v", err) + }) + err = client.Subscribe(ctx(), testUUID+"/TestUnsubcribe", mqtt.ExactlyOnce) + if err != nil { + t.Fatalf("subscribe should not have failed: %v", err) + } + err = client.Unsubscribe(ctx(), testUUID+"/TestUnsubcribe") + if err != nil { + t.Fatalf("unsubscribe should not have failed: %v", err) + } + err = client.PublishString(ctx(), testUUID+"/TestUnsubcribe", "hello world", mqtt.ExactlyOnce) + if err != nil { + t.Fatalf("publish should not have failed: %v", err) + } + <-time.After(500 * time.Millisecond) +} + +// TestRemoveRoute checks that a route can be unsubscribed from +func TestRemoveRoute(t *testing.T) { + client, err := mqtt.NewClient(mqtt.ClientOptions{ + Servers: []string{ + "tcp://test.mosquitto.org:1883", + }, + }) + if err != nil { + t.Fatalf("creating client should not have failed: %v", err) + } + err = client.Connect(ctx()) + defer client.DisconnectImmediately() + if err != nil { + t.Fatalf("connect should not have failed: %v", err) + } + reciever, route := client.Listen(testUUID + "/TestRemoveRoute") + err = client.Subscribe(ctx(), testUUID+"/TestRemoveRoute", mqtt.ExactlyOnce) + if err != nil { + t.Fatalf("subscribe should not have failed: %v", err) + } + err = client.PublishString(ctx(), testUUID+"/TestRemoveRoute", "hello", mqtt.ExactlyOnce) + if err != nil { + t.Fatalf("publish should not have failed: %v", err) + } + <-reciever + route.Stop() + select { + case <-reciever: + t.Fatalf("recieved a message which was not meant to happen: %v", err) + case <-time.After(500 * time.Millisecond): + } +} + +// TestEmptyRoute checks that an empty route does nothing +func TestEmptyRoute(t *testing.T) { + client, err := mqtt.NewClient(mqtt.ClientOptions{ + Servers: []string{ + "tcp://test.mosquitto.org:1883", + }, + }) + if err != nil { + t.Fatalf("creating client should not have failed: %v", err) + } + client.Handle(testUUID+"/TestEmptyRoute/abc", nil) +}