diff --git a/pkg/client/client.go b/pkg/client/client.go index 454f418..85a4c89 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -1,6 +1,8 @@ package client import ( + "net/url" + "github.com/1995parham/pakhshi/pkg/token" mqtt "github.com/eclipse/paho.mqtt.golang" ) @@ -12,12 +14,16 @@ func NewClient(opts *mqtt.ClientOptions) mqtt.Client { servers := make(map[string]*mqtt.ClientOptions) for _, server := range opts.Servers { - servers[server.Host] = opts + lopts := new(mqtt.ClientOptions) + *lopts = *opts + lopts.Servers = []*url.URL{server} + servers[server.String()] = lopts } return NewClientWithOptions(servers) } +// NewClientWithOptions creates a pakhshi client based on given paho options and broker names. func NewClientWithOptions(opts map[string]*mqtt.ClientOptions) mqtt.Client { clients := make(map[string]mqtt.Client) @@ -38,7 +44,7 @@ type Client struct { } // IsConnected returns a bool signifying whether -// the client is connected or not. +// the client is connected to all mqtt brokers or not. func (c *Client) IsConnected() bool { result := true @@ -50,7 +56,7 @@ func (c *Client) IsConnected() bool { } // IsConnectionOpen return a bool signifying whether the client has an active -// connection to mqtt broker, i.e not in disconnected or reconnect mode. +// connection to all mqtt brokers, i.e not in disconnected or reconnect mode. func (c *Client) IsConnectionOpen() bool { result := true @@ -77,14 +83,24 @@ func (c *Client) Connect() mqtt.Token { // Disconnect will end the connection with the server, but not before waiting // the specified number of milliseconds to wait for existing work to be // completed. -func (*Client) Disconnect(quiesce uint) { +// Please note that this time will be given to each broker. +func (c *Client) Disconnect(quiesce uint) { + for _, client := range c.Clients { + client.Disconnect(quiesce) + } } // Publish will publish a message with the specified QoS and content // to the specified topic. // Returns a token to track delivery of the message to the broker. -func (*Client) Publish(topic string, qos byte, retained bool, payload interface{}) mqtt.Token { - return nil +func (c *Client) Publish(topic string, qos byte, retained bool, payload interface{}) mqtt.Token { + token := token.NewTokens() + + for name, client := range c.Clients { + token.Append(name, client.Publish(topic, qos, retained, payload)) + } + + return token } // Subscribe starts a new subscription. Provide a MessageHandler to be executed when @@ -94,8 +110,14 @@ func (*Client) Publish(topic string, qos byte, retained bool, payload interface{ // call functions within this package that may block (e.g. Publish) other than in // a new go routine. // callback must be safe for concurrent use by multiple goroutines. -func (*Client) Subscribe(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token { - return nil +func (c *Client) Subscribe(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token { + token := token.NewTokens() + + for name, client := range c.Clients { + token.Append(name, client.Subscribe(topic, qos, callback)) + } + + return token } // SubscribeMultiple starts a new subscription for multiple topics. Provide a MessageHandler to @@ -106,15 +128,27 @@ func (*Client) Subscribe(topic string, qos byte, callback mqtt.MessageHandler) m // call functions within this package that may block (e.g. Publish) other than in // a new go routine. // callback must be safe for concurrent use by multiple goroutines. -func (*Client) SubscribeMultiple(filters map[string]byte, callback mqtt.MessageHandler) mqtt.Token { - return nil +func (c *Client) SubscribeMultiple(filters map[string]byte, callback mqtt.MessageHandler) mqtt.Token { + token := token.NewTokens() + + for name, client := range c.Clients { + token.Append(name, client.SubscribeMultiple(filters, callback)) + } + + return token } // Unsubscribe will end the subscription from each of the topics provided. // Messages published to those topics from other clients will no longer be // received. -func (*Client) Unsubscribe(topics ...string) mqtt.Token { - return nil +func (c *Client) Unsubscribe(topics ...string) mqtt.Token { + token := token.NewTokens() + + for name, client := range c.Clients { + token.Append(name, client.Unsubscribe(topics...)) + } + + return token } // AddRoute allows you to add a handler for messages on a specific topic @@ -126,7 +160,10 @@ func (*Client) Unsubscribe(topics ...string) mqtt.Token { // call functions within this package that may block (e.g. Publish) other than in // a new go routine. // callback must be safe for concurrent use by multiple goroutines. -func (*Client) AddRoute(topic string, callback mqtt.MessageHandler) { +func (c *Client) AddRoute(topic string, callback mqtt.MessageHandler) { + for _, client := range c.Clients { + client.AddRoute(topic, callback) + } } // OptionsReader returns a ClientOptionsReader which is a copy of the clientoptions