mirror of
https://github.com/lucacasonato/mqtt.git
synced 2025-09-26 19:01:12 +08:00
Subscribe multiple
This commit is contained in:
11
subscribe.go
11
subscribe.go
@@ -80,6 +80,17 @@ func (c *Client) Subscribe(ctx context.Context, topic string, qos QOS) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// SubscribeMultiple subscribes to multiple topics and errors if this fails.
|
||||
func (c *Client) SubscribeMultiple(ctx context.Context, subscriptions map[string]QOS) error {
|
||||
subs := make(map[string]byte, len(subscriptions))
|
||||
for topic, qos := range subscriptions {
|
||||
subs[topic] = byte(qos)
|
||||
}
|
||||
token := c.client.SubscribeMultiple(subs, nil)
|
||||
err := tokenWithContext(ctx, token)
|
||||
return err
|
||||
}
|
||||
|
||||
// Unsubscribe unsubscribes from a certain topic and errors if this fails.
|
||||
func (c *Client) Unsubscribe(ctx context.Context, topic string) error {
|
||||
token := c.client.Unsubscribe(topic)
|
||||
|
@@ -73,6 +73,66 @@ func TestSubcribeSuccess(t *testing.T) {
|
||||
message.Acknowledge()
|
||||
}
|
||||
|
||||
// TestSubcribeMultipleSuccess checks that a message gets recieved correctly
|
||||
func TestSubcribeMultipleSuccess(t *testing.T) {
|
||||
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
||||
Servers: []string{
|
||||
"tcp://test.mosquitto.org:1883",
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("creating client should not have failed: %v", err)
|
||||
}
|
||||
err = client.Connect(ctx())
|
||||
defer client.DisconnectImmediately()
|
||||
if err != nil {
|
||||
t.Fatalf("connect should not have failed: %v", err)
|
||||
}
|
||||
|
||||
receiver := make(chan mqtt.Message)
|
||||
err = client.SubscribeMultiple(ctx(), map[string]mqtt.QOS{testUUID + "/TestSubcribeMultipleSuccess/#": mqtt.ExactlyOnce})
|
||||
client.Handle(testUUID+"/TestSubcribeMultipleSuccess/#", func(message mqtt.Message) {
|
||||
receiver <- message
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("subscribe should not have failed: %v", err)
|
||||
}
|
||||
err = client.PublishString(ctx(), testUUID+"/TestSubcribeMultipleSuccess/abc", "[1, 2]", mqtt.ExactlyOnce)
|
||||
if err != nil {
|
||||
t.Fatalf("publish should not have failed: %v", err)
|
||||
}
|
||||
message := <-receiver
|
||||
if string(message.Payload()) != "[1, 2]" {
|
||||
t.Fatalf("message payload should have been byte array '%v' but is %v", []byte("[1, 2]"), message.Payload())
|
||||
}
|
||||
if message.PayloadString() != "[1, 2]" {
|
||||
t.Fatalf("message payload should have been '[1, 2]' but is %v", message.PayloadString())
|
||||
}
|
||||
v := []int{}
|
||||
err = message.PayloadJSON(&v)
|
||||
if err != nil {
|
||||
t.Fatalf("json should have unmarshalled: %v", err)
|
||||
}
|
||||
if len(v) != 2 || v[0] != 1 || v[1] != 2 {
|
||||
t.Fatalf("message payload should have been []int{1, 2} but is %v", v)
|
||||
}
|
||||
if message.Topic() != testUUID+"/TestSubcribeMultipleSuccess/abc" {
|
||||
t.Fatalf("message topic should be %v but is %v", testUUID+"/TestSubcribeMultipleSuccess/abc", message.Topic())
|
||||
}
|
||||
if message.QOS() != mqtt.ExactlyOnce {
|
||||
t.Fatalf("message qos should be mqtt.ExactlyOnce but is %v", message.QOS())
|
||||
}
|
||||
if message.IsDuplicate() != false {
|
||||
t.Fatalf("message IsDuplicate should be false but is %v", message.IsDuplicate())
|
||||
}
|
||||
vars := message.TopicVars()
|
||||
if len(vars) != 1 && vars[0] != "abc" {
|
||||
t.Fatalf("message TopicVars should be ['abc'] but is %v", vars)
|
||||
}
|
||||
|
||||
message.Acknowledge()
|
||||
}
|
||||
|
||||
// TestListenSuccess checks that a listener recieves a message correctly
|
||||
func TestListenSuccess(t *testing.T) {
|
||||
client, err := mqtt.NewClient(mqtt.ClientOptions{
|
||||
|
Reference in New Issue
Block a user