Added publish methods

This commit is contained in:
Luca Casonato
2019-10-20 02:21:38 +02:00
parent 0e43200f26
commit f33337953c
6 changed files with 276 additions and 21 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
coverage.out
coverage.html

9
makefile Normal file
View File

@@ -0,0 +1,9 @@
coverage:
go test -coverprofile=coverage.out
go tool cover -html=coverage.out -o coverage.html
coverage-linux: coverage
xdg-open coverage.html
coverage-macos: coverage
open coverage.html

23
mqtt.go
View File

@@ -24,6 +24,15 @@ type ClientOptions struct {
AutoReconnect bool // If the client should automatically try to reconnect when the connection is lost AutoReconnect bool // If the client should automatically try to reconnect when the connection is lost
} }
// QOS describes the quality of service of an mqtt publish
type QOS byte
const (
AtMostOnce QOS = iota // Deliver at most once to every subscriber - this means message delivery is not guaranteed
AtLeastOnce // Deliver a message at least once to every subscriber
ExactlyOnce // Deliver a message exactly once to every subscriber
)
var ( var (
ErrMinimumOneServer = errors.New("mqtt: at least one server needs to be specified") ErrMinimumOneServer = errors.New("mqtt: at least one server needs to be specified")
) )
@@ -64,6 +73,15 @@ func NewClient(options ClientOptions) (*Client, error) {
func (c *Client) Connect(ctx context.Context) error { func (c *Client) Connect(ctx context.Context) error {
// try to connect to the client // try to connect to the client
token := c.client.Connect() token := c.client.Connect()
return tokenWithContext(ctx, token)
}
// Disconnect will immediately close the conenction with the mqtt servers
func (c *Client) DisconnectImmediately() {
c.client.Disconnect(0)
}
func tokenWithContext(ctx context.Context, token paho.Token) error {
completer := make(chan error) completer := make(chan error)
// TODO: This go routine will not be removed up if the ctx is cancelled or a the ctx timeout passes // TODO: This go routine will not be removed up if the ctx is cancelled or a the ctx timeout passes
@@ -81,8 +99,3 @@ func (c *Client) Connect(ctx context.Context) error {
} }
} }
} }
// Disconnect will immediately close the conenction with the mqtt servers
func (c *Client) DisconnectImmediately() {
c.client.Disconnect(0)
}

View File

@@ -9,7 +9,7 @@ import (
"github.com/lucacasonato/mqtt" "github.com/lucacasonato/mqtt"
) )
// create client with a nil server array // TestNewClientNilServer checks if creating a client with a nil server array works
func TestNewClientNilServer(t *testing.T) { func TestNewClientNilServer(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{}) client, err := mqtt.NewClient(mqtt.ClientOptions{})
if !errors.Is(err, mqtt.ErrMinimumOneServer) { if !errors.Is(err, mqtt.ErrMinimumOneServer) {
@@ -20,7 +20,7 @@ func TestNewClientNilServer(t *testing.T) {
} }
} }
// create client with a server array with no servers // TestNewClientNoServer checks if creating a client with a server array with no servers works
func TestNewClientNoServer(t *testing.T) { func TestNewClientNoServer(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{ client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{}, Servers: []string{},
@@ -33,7 +33,7 @@ func TestNewClientNoServer(t *testing.T) {
} }
} }
// create client with a server array with no servers // TestNewClientBasicServer checks if creating a client with a server array with one server works
func TestNewClientBasicServer(t *testing.T) { func TestNewClientBasicServer(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{ client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{ Servers: []string{
@@ -48,7 +48,7 @@ func TestNewClientBasicServer(t *testing.T) {
} }
} }
// check that a client gets created and a client id is generated when it is not set // TestNewClientNoClientID checks that a client gets created and a client id is generated when it is not set
func TestNewClientNoClientID(t *testing.T) { func TestNewClientNoClientID(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{ client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{ Servers: []string{
@@ -66,7 +66,7 @@ func TestNewClientNoClientID(t *testing.T) {
} }
} }
// check that a client gets created and a client id is not changed when it is set // TestNewClientHasClientID checks that a client gets created and a client id is not changed when it is already set
func TestNewClientHasClientID(t *testing.T) { func TestNewClientHasClientID(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{ client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{ Servers: []string{
@@ -85,7 +85,7 @@ func TestNewClientHasClientID(t *testing.T) {
} }
} }
// check that a client gets created and a client id is not changed when it is set // TestNewClientWithAuthentication has username and password to check if those get set
func TestNewClientWithAuthentication(t *testing.T) { func TestNewClientWithAuthentication(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{ client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{ Servers: []string{
@@ -102,13 +102,12 @@ func TestNewClientWithAuthentication(t *testing.T) {
} }
} }
// check that a client gets created and a client id is not changed when it is set // TestConnectSuccess just checks that connecting to a broker works
func TestConnectSuccess(t *testing.T) { func TestConnectSuccess(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{ client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{ Servers: []string{
"tcp://test.mosquitto.org:1883", "tcp://test.mosquitto.org:1883",
}, },
AutoReconnect: false,
}) })
if err != nil { if err != nil {
t.Fatal("err should be nil") t.Fatal("err should be nil")
@@ -122,13 +121,12 @@ func TestConnectSuccess(t *testing.T) {
} }
} }
// check that a client gets created and a client id is not changed when it is set // TestConnectContextTimeout checks if connect errors if a context with a timeout times out
func TestConnectContextTimeout(t *testing.T) { func TestConnectContextTimeout(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{ client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{ Servers: []string{
"tcp://test.mosquitto.org:1883", "tcp://test.mosquitto.org:1883",
}, },
AutoReconnect: false,
}) })
if err != nil { if err != nil {
t.Fatal("err should be nil") t.Fatal("err should be nil")
@@ -144,13 +142,12 @@ func TestConnectContextTimeout(t *testing.T) {
} }
} }
// check that a client gets created and a client id is not changed when it is set // TestConnectContextCancel checks if connect errors if a context with a cancel gets canceled
func TestConnectContextCancel(t *testing.T) { func TestConnectContextCancel(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{ client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{ Servers: []string{
"tcp://test.mosquitto.org:1883", "tcp://test.mosquitto.org:1883",
}, },
AutoReconnect: false,
}) })
if err != nil { if err != nil {
t.Fatal("err should be nil") t.Fatal("err should be nil")
@@ -170,13 +167,12 @@ func TestConnectContextCancel(t *testing.T) {
} }
} }
// check that a client gets created and a client id is not changed when it is set // TestConnectFailed that a invalid client does not connect and errors
func TestConnectFailed(t *testing.T) { func TestConnectFailed(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{ client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{ Servers: []string{
"tcp://test.mosquitto.org:1884", // incorrect port "tcp://test.mosquitto.org:1884", // incorrect port
}, },
AutoReconnect: false,
}) })
if err != nil { if err != nil {
t.Fatal("err should be nil") t.Fatal("err should be nil")
@@ -190,13 +186,12 @@ func TestConnectFailed(t *testing.T) {
} }
} }
// check that a client gets created and a client id is not changed when it is set // TestDisconnectImmediately immediately disconnects the mqtt broker
func TestDisconnectImmediately(t *testing.T) { func TestDisconnectImmediately(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{ client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{ Servers: []string{
"tcp://test.mosquitto.org:1883", "tcp://test.mosquitto.org:1883",
}, },
AutoReconnect: false,
}) })
if err != nil { if err != nil {
t.Fatal("err should be nil") t.Fatal("err should be nil")

41
publish.go Normal file
View File

@@ -0,0 +1,41 @@
package mqtt
import (
"context"
"encoding/json"
)
type PublishOption int
const (
Retain PublishOption = iota
)
func (c *Client) Publish(ctx context.Context, topic string, payload []byte, qos QOS, options ...PublishOption) error {
return c.publish(ctx, topic, payload, qos, options)
}
func (c *Client) PublishString(ctx context.Context, topic string, payload string, qos QOS, options ...PublishOption) error {
return c.publish(ctx, topic, []byte(payload), qos, options)
}
func (c *Client) PublishJSON(ctx context.Context, topic string, payload interface{}, qos QOS, options ...PublishOption) error {
data, err := json.Marshal(payload)
if err != nil {
return err
}
return c.publish(ctx, topic, data, qos, options)
}
func (c *Client) publish(ctx context.Context, topic string, payload []byte, qos QOS, options []PublishOption) error {
var retained = false
for _, option := range options {
switch option {
case Retain:
retained = true
}
}
token := c.client.Publish(topic, byte(qos), retained, payload)
return tokenWithContext(ctx, token)
}

195
publish_test.go Normal file
View File

@@ -0,0 +1,195 @@
package mqtt_test
import (
"context"
"encoding/json"
"errors"
"testing"
"time"
"github.com/google/uuid"
"github.com/lucacasonato/mqtt"
)
var testUUID = uuid.New().String()
// TestPublishSuccess checks that a message publish succeeds
func TestPublishSuccess(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
},
})
if err != nil {
t.Fatalf("creating client should not have failed: %v", err)
}
err = client.Connect(context.Background())
defer client.DisconnectImmediately()
if err != nil {
t.Fatalf("connect should not have failed: %v", err)
}
err = client.Publish(context.Background(), testUUID+"/test_publish", []byte("hello"), mqtt.AtLeastOnce)
if err != nil {
t.Fatalf("publish should not have failed: %v", err)
}
}
// TestPublishContextTimeout checks that a message publish errors if a context with a timeout times out
func TestPublishContextTimeout(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
},
})
defer client.DisconnectImmediately()
if err != nil {
t.Fatalf("creating client should not have failed: %v", err)
}
err = client.Connect(context.Background())
if err != nil {
t.Fatalf("connect should not have failed: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
defer cancel()
err = client.Publish(ctx, testUUID+"/test_publish", []byte("hello"), mqtt.AtLeastOnce)
if !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("publish should have returned the error context.DeadlineExceeded")
}
}
// TestPublishContextCancelled checks that a message publish errors if a context with a cancel gets canceled
func TestPublishContextCancelled(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
},
})
defer client.DisconnectImmediately()
if err != nil {
t.Fatalf("creating client should not have failed: %v", err)
}
err = client.Connect(context.Background())
if err != nil {
t.Fatalf("connect should not have failed: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(1 * time.Microsecond)
cancel()
}()
defer cancel()
err = client.Publish(ctx, testUUID+"/test_publish", []byte("hello"), mqtt.AtLeastOnce)
if !errors.Is(err, context.Canceled) {
t.Fatalf("publish should have returned the error context.Canceled")
}
}
// TestPublishFailed checks that a invalid publish does not get publish but errors
func TestPublishFailed(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
},
})
defer client.DisconnectImmediately()
if err != nil {
t.Fatalf("creating client should not have failed: %v", err)
}
err = client.Connect(context.Background())
if err != nil {
t.Fatalf("connect should not have failed: %v", err)
}
err = client.Publish(context.Background(), testUUID+"/test_publish", nil, 3)
if err == nil {
t.Fatalf("publish should have failed")
}
}
// TestPublishSuccess checks that a message publish succeeds
func TestPublishSuccessRetained(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
},
})
if err != nil {
t.Fatalf("creating client should not have failed: %v", err)
}
err = client.Connect(context.Background())
defer client.DisconnectImmediately()
if err != nil {
t.Fatalf("connect should not have failed: %v", err)
}
err = client.Publish(context.Background(), testUUID+"/test_publish", []byte("hello"), mqtt.AtLeastOnce, mqtt.Retain)
if err != nil {
t.Fatalf("publish should not have failed: %v", err)
}
}
// TestPublisStringSuccess checks that a string message publish succeeds
func TestPublisStringSuccess(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
},
})
if err != nil {
t.Fatalf("creating client should not have failed: %v", err)
}
err = client.Connect(context.Background())
defer client.DisconnectImmediately()
if err != nil {
t.Fatalf("connect should not have failed: %v", err)
}
err = client.PublishString(context.Background(), testUUID+"/test_publish", "world", mqtt.AtLeastOnce)
if err != nil {
t.Fatalf("publish should not have failed: %v", err)
}
}
// TestPublisJSONSuccess checks that json message publish succeeds
func TestPublisJSONSuccess(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
},
})
if err != nil {
t.Fatalf("creating client should not have failed: %v", err)
}
err = client.Connect(context.Background())
defer client.DisconnectImmediately()
if err != nil {
t.Fatalf("connect should not have failed: %v", err)
}
err = client.PublishJSON(context.Background(), testUUID+"/test_publish", []string{"hello", "world"}, mqtt.AtLeastOnce)
if err != nil {
t.Fatalf("publish should not have failed: %v", err)
}
}
// TestPublisJSONFailed checks that json message fails to parse
func TestPublisJSONFailed(t *testing.T) {
client, err := mqtt.NewClient(mqtt.ClientOptions{
Servers: []string{
"tcp://test.mosquitto.org:1883",
},
})
if err != nil {
t.Fatalf("creating client should not have failed: %v", err)
}
err = client.Connect(context.Background())
defer client.DisconnectImmediately()
if err != nil {
t.Fatalf("connect should not have failed: %v", err)
}
err = client.PublishJSON(context.Background(), testUUID+"/test_publish", make(chan int), mqtt.AtLeastOnce)
if _, ok := err.(*json.UnsupportedTypeError); !ok {
t.Fatalf("publish error should be of type *json.UnsupportedTypeError: %v", err)
}
}