diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ce4af3f --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +coverage.out +coverage.html \ No newline at end of file diff --git a/makefile b/makefile new file mode 100644 index 0000000..741daa5 --- /dev/null +++ b/makefile @@ -0,0 +1,9 @@ +coverage: + go test -coverprofile=coverage.out + go tool cover -html=coverage.out -o coverage.html + +coverage-linux: coverage + xdg-open coverage.html + +coverage-macos: coverage + open coverage.html \ No newline at end of file diff --git a/mqtt.go b/mqtt.go index a7f5b6c..da878b6 100644 --- a/mqtt.go +++ b/mqtt.go @@ -24,6 +24,15 @@ type ClientOptions struct { AutoReconnect bool // If the client should automatically try to reconnect when the connection is lost } +// QOS describes the quality of service of an mqtt publish +type QOS byte + +const ( + AtMostOnce QOS = iota // Deliver at most once to every subscriber - this means message delivery is not guaranteed + AtLeastOnce // Deliver a message at least once to every subscriber + ExactlyOnce // Deliver a message exactly once to every subscriber +) + var ( ErrMinimumOneServer = errors.New("mqtt: at least one server needs to be specified") ) @@ -64,6 +73,15 @@ func NewClient(options ClientOptions) (*Client, error) { func (c *Client) Connect(ctx context.Context) error { // try to connect to the client token := c.client.Connect() + return tokenWithContext(ctx, token) +} + +// Disconnect will immediately close the conenction with the mqtt servers +func (c *Client) DisconnectImmediately() { + c.client.Disconnect(0) +} + +func tokenWithContext(ctx context.Context, token paho.Token) error { completer := make(chan error) // TODO: This go routine will not be removed up if the ctx is cancelled or a the ctx timeout passes @@ -81,8 +99,3 @@ func (c *Client) Connect(ctx context.Context) error { } } } - -// Disconnect will immediately close the conenction with the mqtt servers -func (c *Client) DisconnectImmediately() { - c.client.Disconnect(0) -} diff --git a/mqtt_test.go b/mqtt_test.go index cf64ca3..43944fe 100644 --- a/mqtt_test.go +++ b/mqtt_test.go @@ -9,7 +9,7 @@ import ( "github.com/lucacasonato/mqtt" ) -// create client with a nil server array +// TestNewClientNilServer checks if creating a client with a nil server array works func TestNewClientNilServer(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{}) if !errors.Is(err, mqtt.ErrMinimumOneServer) { @@ -20,7 +20,7 @@ func TestNewClientNilServer(t *testing.T) { } } -// create client with a server array with no servers +// TestNewClientNoServer checks if creating a client with a server array with no servers works func TestNewClientNoServer(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{}, @@ -33,7 +33,7 @@ func TestNewClientNoServer(t *testing.T) { } } -// create client with a server array with no servers +// TestNewClientBasicServer checks if creating a client with a server array with one server works func TestNewClientBasicServer(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ @@ -48,7 +48,7 @@ func TestNewClientBasicServer(t *testing.T) { } } -// check that a client gets created and a client id is generated when it is not set +// TestNewClientNoClientID checks that a client gets created and a client id is generated when it is not set func TestNewClientNoClientID(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ @@ -66,7 +66,7 @@ func TestNewClientNoClientID(t *testing.T) { } } -// check that a client gets created and a client id is not changed when it is set +// TestNewClientHasClientID checks that a client gets created and a client id is not changed when it is already set func TestNewClientHasClientID(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ @@ -85,7 +85,7 @@ func TestNewClientHasClientID(t *testing.T) { } } -// check that a client gets created and a client id is not changed when it is set +// TestNewClientWithAuthentication has username and password to check if those get set func TestNewClientWithAuthentication(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ @@ -102,13 +102,12 @@ func TestNewClientWithAuthentication(t *testing.T) { } } -// check that a client gets created and a client id is not changed when it is set +// TestConnectSuccess just checks that connecting to a broker works func TestConnectSuccess(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ "tcp://test.mosquitto.org:1883", }, - AutoReconnect: false, }) if err != nil { t.Fatal("err should be nil") @@ -122,13 +121,12 @@ func TestConnectSuccess(t *testing.T) { } } -// check that a client gets created and a client id is not changed when it is set +// TestConnectContextTimeout checks if connect errors if a context with a timeout times out func TestConnectContextTimeout(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ "tcp://test.mosquitto.org:1883", }, - AutoReconnect: false, }) if err != nil { t.Fatal("err should be nil") @@ -144,13 +142,12 @@ func TestConnectContextTimeout(t *testing.T) { } } -// check that a client gets created and a client id is not changed when it is set +// TestConnectContextCancel checks if connect errors if a context with a cancel gets canceled func TestConnectContextCancel(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ "tcp://test.mosquitto.org:1883", }, - AutoReconnect: false, }) if err != nil { t.Fatal("err should be nil") @@ -170,13 +167,12 @@ func TestConnectContextCancel(t *testing.T) { } } -// check that a client gets created and a client id is not changed when it is set +// TestConnectFailed that a invalid client does not connect and errors func TestConnectFailed(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ "tcp://test.mosquitto.org:1884", // incorrect port }, - AutoReconnect: false, }) if err != nil { t.Fatal("err should be nil") @@ -190,13 +186,12 @@ func TestConnectFailed(t *testing.T) { } } -// check that a client gets created and a client id is not changed when it is set +// TestDisconnectImmediately immediately disconnects the mqtt broker func TestDisconnectImmediately(t *testing.T) { client, err := mqtt.NewClient(mqtt.ClientOptions{ Servers: []string{ "tcp://test.mosquitto.org:1883", }, - AutoReconnect: false, }) if err != nil { t.Fatal("err should be nil") diff --git a/publish.go b/publish.go new file mode 100644 index 0000000..2f0d0a8 --- /dev/null +++ b/publish.go @@ -0,0 +1,41 @@ +package mqtt + +import ( + "context" + "encoding/json" +) + +type PublishOption int + +const ( + Retain PublishOption = iota +) + +func (c *Client) Publish(ctx context.Context, topic string, payload []byte, qos QOS, options ...PublishOption) error { + return c.publish(ctx, topic, payload, qos, options) +} + +func (c *Client) PublishString(ctx context.Context, topic string, payload string, qos QOS, options ...PublishOption) error { + return c.publish(ctx, topic, []byte(payload), qos, options) +} + +func (c *Client) PublishJSON(ctx context.Context, topic string, payload interface{}, qos QOS, options ...PublishOption) error { + data, err := json.Marshal(payload) + if err != nil { + return err + } + return c.publish(ctx, topic, data, qos, options) +} + +func (c *Client) publish(ctx context.Context, topic string, payload []byte, qos QOS, options []PublishOption) error { + var retained = false + for _, option := range options { + switch option { + case Retain: + retained = true + } + } + + token := c.client.Publish(topic, byte(qos), retained, payload) + return tokenWithContext(ctx, token) +} diff --git a/publish_test.go b/publish_test.go new file mode 100644 index 0000000..ec0a3ae --- /dev/null +++ b/publish_test.go @@ -0,0 +1,195 @@ +package mqtt_test + +import ( + "context" + "encoding/json" + "errors" + "testing" + "time" + + "github.com/google/uuid" + "github.com/lucacasonato/mqtt" +) + +var testUUID = uuid.New().String() + +// TestPublishSuccess checks that a message publish succeeds +func TestPublishSuccess(t *testing.T) { + client, err := mqtt.NewClient(mqtt.ClientOptions{ + Servers: []string{ + "tcp://test.mosquitto.org:1883", + }, + }) + if err != nil { + t.Fatalf("creating client should not have failed: %v", err) + } + err = client.Connect(context.Background()) + defer client.DisconnectImmediately() + if err != nil { + t.Fatalf("connect should not have failed: %v", err) + } + + err = client.Publish(context.Background(), testUUID+"/test_publish", []byte("hello"), mqtt.AtLeastOnce) + if err != nil { + t.Fatalf("publish should not have failed: %v", err) + } +} + +// TestPublishContextTimeout checks that a message publish errors if a context with a timeout times out +func TestPublishContextTimeout(t *testing.T) { + client, err := mqtt.NewClient(mqtt.ClientOptions{ + Servers: []string{ + "tcp://test.mosquitto.org:1883", + }, + }) + defer client.DisconnectImmediately() + if err != nil { + t.Fatalf("creating client should not have failed: %v", err) + } + err = client.Connect(context.Background()) + if err != nil { + t.Fatalf("connect should not have failed: %v", err) + } + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond) + defer cancel() + err = client.Publish(ctx, testUUID+"/test_publish", []byte("hello"), mqtt.AtLeastOnce) + if !errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("publish should have returned the error context.DeadlineExceeded") + } +} + +// TestPublishContextCancelled checks that a message publish errors if a context with a cancel gets canceled +func TestPublishContextCancelled(t *testing.T) { + client, err := mqtt.NewClient(mqtt.ClientOptions{ + Servers: []string{ + "tcp://test.mosquitto.org:1883", + }, + }) + defer client.DisconnectImmediately() + if err != nil { + t.Fatalf("creating client should not have failed: %v", err) + } + err = client.Connect(context.Background()) + if err != nil { + t.Fatalf("connect should not have failed: %v", err) + } + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(1 * time.Microsecond) + cancel() + }() + defer cancel() + err = client.Publish(ctx, testUUID+"/test_publish", []byte("hello"), mqtt.AtLeastOnce) + if !errors.Is(err, context.Canceled) { + t.Fatalf("publish should have returned the error context.Canceled") + } +} + +// TestPublishFailed checks that a invalid publish does not get publish but errors +func TestPublishFailed(t *testing.T) { + client, err := mqtt.NewClient(mqtt.ClientOptions{ + Servers: []string{ + "tcp://test.mosquitto.org:1883", + }, + }) + defer client.DisconnectImmediately() + if err != nil { + t.Fatalf("creating client should not have failed: %v", err) + } + err = client.Connect(context.Background()) + if err != nil { + t.Fatalf("connect should not have failed: %v", err) + } + err = client.Publish(context.Background(), testUUID+"/test_publish", nil, 3) + if err == nil { + t.Fatalf("publish should have failed") + } +} + +// TestPublishSuccess checks that a message publish succeeds +func TestPublishSuccessRetained(t *testing.T) { + client, err := mqtt.NewClient(mqtt.ClientOptions{ + Servers: []string{ + "tcp://test.mosquitto.org:1883", + }, + }) + if err != nil { + t.Fatalf("creating client should not have failed: %v", err) + } + err = client.Connect(context.Background()) + defer client.DisconnectImmediately() + if err != nil { + t.Fatalf("connect should not have failed: %v", err) + } + + err = client.Publish(context.Background(), testUUID+"/test_publish", []byte("hello"), mqtt.AtLeastOnce, mqtt.Retain) + if err != nil { + t.Fatalf("publish should not have failed: %v", err) + } +} + +// TestPublisStringSuccess checks that a string message publish succeeds +func TestPublisStringSuccess(t *testing.T) { + client, err := mqtt.NewClient(mqtt.ClientOptions{ + Servers: []string{ + "tcp://test.mosquitto.org:1883", + }, + }) + if err != nil { + t.Fatalf("creating client should not have failed: %v", err) + } + err = client.Connect(context.Background()) + defer client.DisconnectImmediately() + if err != nil { + t.Fatalf("connect should not have failed: %v", err) + } + + err = client.PublishString(context.Background(), testUUID+"/test_publish", "world", mqtt.AtLeastOnce) + if err != nil { + t.Fatalf("publish should not have failed: %v", err) + } +} + +// TestPublisJSONSuccess checks that json message publish succeeds +func TestPublisJSONSuccess(t *testing.T) { + client, err := mqtt.NewClient(mqtt.ClientOptions{ + Servers: []string{ + "tcp://test.mosquitto.org:1883", + }, + }) + if err != nil { + t.Fatalf("creating client should not have failed: %v", err) + } + err = client.Connect(context.Background()) + defer client.DisconnectImmediately() + if err != nil { + t.Fatalf("connect should not have failed: %v", err) + } + + err = client.PublishJSON(context.Background(), testUUID+"/test_publish", []string{"hello", "world"}, mqtt.AtLeastOnce) + if err != nil { + t.Fatalf("publish should not have failed: %v", err) + } +} + +// TestPublisJSONFailed checks that json message fails to parse +func TestPublisJSONFailed(t *testing.T) { + client, err := mqtt.NewClient(mqtt.ClientOptions{ + Servers: []string{ + "tcp://test.mosquitto.org:1883", + }, + }) + if err != nil { + t.Fatalf("creating client should not have failed: %v", err) + } + err = client.Connect(context.Background()) + defer client.DisconnectImmediately() + if err != nil { + t.Fatalf("connect should not have failed: %v", err) + } + + err = client.PublishJSON(context.Background(), testUUID+"/test_publish", make(chan int), mqtt.AtLeastOnce) + if _, ok := err.(*json.UnsupportedTypeError); !ok { + t.Fatalf("publish error should be of type *json.UnsupportedTypeError: %v", err) + } +}