reuse mq connections

This commit is contained in:
Matthew R. Kasun
2022-08-29 14:08:01 -04:00
parent a7ff340692
commit 49c6380643
7 changed files with 53 additions and 63 deletions

View File

@@ -171,7 +171,7 @@ func runMessageQueue(wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
brokerHost, secure := servercfg.GetMessageQueueEndpoint() brokerHost, secure := servercfg.GetMessageQueueEndpoint()
logger.Log(0, "connecting to mq broker at", brokerHost, "with TLS?", fmt.Sprintf("%v", secure)) logger.Log(0, "connecting to mq broker at", brokerHost, "with TLS?", fmt.Sprintf("%v", secure))
var client = mq.SetupMQTT(false) // Set up the subscription listener mq.SetupMQTT()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
go mq.Keepalive(ctx) go mq.Keepalive(ctx)
go logic.ManageZombies(ctx) go logic.ManageZombies(ctx)
@@ -180,7 +180,6 @@ func runMessageQueue(wg *sync.WaitGroup) {
<-quit <-quit
cancel() cancel()
logger.Log(0, "Message Queue shutting down") logger.Log(0, "Message Queue shutting down")
client.Disconnect(250)
} }
func setVerbosity() { func setVerbosity() {

View File

@@ -21,8 +21,10 @@ const MQ_TIMEOUT = 30
var peer_force_send = 0 var peer_force_send = 0
var mqclient mqtt.Client
// SetupMQTT creates a connection to broker and return client // SetupMQTT creates a connection to broker and return client
func SetupMQTT(publish bool) mqtt.Client { func SetupMQTT() {
opts := mqtt.NewClientOptions() opts := mqtt.NewClientOptions()
broker, secure := servercfg.GetMessageQueueEndpoint() broker, secure := servercfg.GetMessageQueueEndpoint()
opts.AddBroker(broker) opts.AddBroker(broker)
@@ -37,28 +39,26 @@ func SetupMQTT(publish bool) mqtt.Client {
opts.SetKeepAlive(time.Minute) opts.SetKeepAlive(time.Minute)
opts.SetWriteTimeout(time.Minute) opts.SetWriteTimeout(time.Minute)
opts.SetOnConnectHandler(func(client mqtt.Client) { opts.SetOnConnectHandler(func(client mqtt.Client) {
if !publish { if token := client.Subscribe("ping/#", 2, mqtt.MessageHandler(Ping)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
if token := client.Subscribe("ping/#", 2, mqtt.MessageHandler(Ping)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { client.Disconnect(240)
client.Disconnect(240) logger.Log(0, "ping subscription failed")
logger.Log(0, "ping subscription failed")
}
if token := client.Subscribe("update/#", 0, mqtt.MessageHandler(UpdateNode)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
client.Disconnect(240)
logger.Log(0, "node update subscription failed")
}
if token := client.Subscribe("signal/#", 0, mqtt.MessageHandler(ClientPeerUpdate)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
client.Disconnect(240)
logger.Log(0, "node client subscription failed")
}
opts.SetOrderMatters(true)
opts.SetResumeSubs(true)
} }
if token := client.Subscribe("update/#", 0, mqtt.MessageHandler(UpdateNode)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
client.Disconnect(240)
logger.Log(0, "node update subscription failed")
}
if token := client.Subscribe("signal/#", 0, mqtt.MessageHandler(ClientPeerUpdate)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
client.Disconnect(240)
logger.Log(0, "node client subscription failed")
}
opts.SetOrderMatters(true)
opts.SetResumeSubs(true)
}) })
client := mqtt.NewClient(opts) mqclient := mqtt.NewClient(opts)
tperiod := time.Now().Add(10 * time.Second) tperiod := time.Now().Add(10 * time.Second)
for { for {
if token := client.Connect(); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil { if token := mqclient.Connect(); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
logger.Log(2, "unable to connect to broker, retrying ...") logger.Log(2, "unable to connect to broker, retrying ...")
if time.Now().After(tperiod) { if time.Now().After(tperiod) {
if token.Error() == nil { if token.Error() == nil {
@@ -72,10 +72,6 @@ func SetupMQTT(publish bool) mqtt.Client {
} }
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
} }
if !publish {
logger.Log(0, "successfully connected to mq broker")
}
return client
} }
// Keepalive -- periodically pings all nodes to let them know server is still alive and doing well // Keepalive -- periodically pings all nodes to let them know server is still alive and doing well

View File

@@ -61,13 +61,14 @@ func encryptMsg(node *models.Node, msg []byte) ([]byte, error) {
} }
func publish(node *models.Node, dest string, msg []byte) error { func publish(node *models.Node, dest string, msg []byte) error {
client := SetupMQTT(true)
defer client.Disconnect(250)
encrypted, encryptErr := encryptMsg(node, msg) encrypted, encryptErr := encryptMsg(node, msg)
if encryptErr != nil { if encryptErr != nil {
return encryptErr return encryptErr
} }
if token := client.Publish(dest, 0, true, encrypted); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil { if mqclient == nil {
return errors.New("cannot publish ... mqclient not connected")
}
if token := mqclient.Publish(dest, 0, true, encrypted); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
var err error var err error
if token.Error() == nil { if token.Error() == nil {
err = errors.New("connection timeout") err = errors.New("connection timeout")
@@ -79,7 +80,7 @@ func publish(node *models.Node, dest string, msg []byte) error {
return nil return nil
} }
// decodes a message queue topic and returns the embedded node.ID // decodes a message queue topic and returns the embedded node.ID
func getID(topic string) (string, error) { func getID(topic string) (string, error) {
parts := strings.Split(topic, "/") parts := strings.Split(topic, "/")
count := len(parts) count := len(parts)

View File

@@ -34,6 +34,8 @@ var messageCache = new(sync.Map)
var serverSet map[string]bool var serverSet map[string]bool
var mqclient mqtt.Client
const lastNodeUpdate = "lnu" const lastNodeUpdate = "lnu"
const lastPeerUpdate = "lpu" const lastPeerUpdate = "lpu"
@@ -192,12 +194,12 @@ func unsubscribeNode(client mqtt.Client, nodeCfg *config.ClientConfig) {
func messageQueue(ctx context.Context, wg *sync.WaitGroup, cfg *config.ClientConfig) { func messageQueue(ctx context.Context, wg *sync.WaitGroup, cfg *config.ClientConfig) {
defer wg.Done() defer wg.Done()
logger.Log(0, "network:", cfg.Node.Network, "netclient message queue started for server:", cfg.Server.Server) logger.Log(0, "network:", cfg.Node.Network, "netclient message queue started for server:", cfg.Server.Server)
client, err := setupMQTT(cfg, false) err := setupMQTT(cfg)
if err != nil { if err != nil {
logger.Log(0, "unable to connect to broker", cfg.Server.Server, err.Error()) logger.Log(0, "unable to connect to broker", cfg.Server.Server, err.Error())
return return
} }
defer client.Disconnect(250) //defer mqclient.Disconnect(250)
<-ctx.Done() <-ctx.Done()
logger.Log(0, "shutting down message queue for server", cfg.Server.Server) logger.Log(0, "shutting down message queue for server", cfg.Server.Server)
} }
@@ -232,7 +234,7 @@ func NewTLSConfig(server string) (*tls.Config, error) {
// setupMQTT creates a connection to broker and returns client // setupMQTT creates a connection to broker and returns client
// this function is primarily used to create a connection to publish to the broker // this function is primarily used to create a connection to publish to the broker
func setupMQTT(cfg *config.ClientConfig, publish bool) (mqtt.Client, error) { func setupMQTT(cfg *config.ClientConfig) error {
opts := mqtt.NewClientOptions() opts := mqtt.NewClientOptions()
server := cfg.Server.Server server := cfg.Server.Server
port := cfg.Server.MQPort port := cfg.Server.MQPort
@@ -240,7 +242,7 @@ func setupMQTT(cfg *config.ClientConfig, publish bool) (mqtt.Client, error) {
tlsConfig, err := NewTLSConfig(server) tlsConfig, err := NewTLSConfig(server)
if err != nil { if err != nil {
logger.Log(0, "failed to get TLS config for", server, err.Error()) logger.Log(0, "failed to get TLS config for", server, err.Error())
return nil, err return err
} }
opts.SetTLSConfig(tlsConfig) opts.SetTLSConfig(tlsConfig)
opts.SetClientID(ncutils.MakeRandomString(23)) opts.SetClientID(ncutils.MakeRandomString(23))
@@ -252,17 +254,15 @@ func setupMQTT(cfg *config.ClientConfig, publish bool) (mqtt.Client, error) {
opts.SetWriteTimeout(time.Minute) opts.SetWriteTimeout(time.Minute)
opts.SetOnConnectHandler(func(client mqtt.Client) { opts.SetOnConnectHandler(func(client mqtt.Client) {
if !publish { networks, err := ncutils.GetSystemNetworks()
networks, err := ncutils.GetSystemNetworks() if err != nil {
if err != nil { logger.Log(0, "error retriving networks", err.Error())
logger.Log(0, "error retriving networks", err.Error()) }
} for _, network := range networks {
for _, network := range networks { var currNodeCfg config.ClientConfig
var currNodeCfg config.ClientConfig currNodeCfg.Network = network
currNodeCfg.Network = network currNodeCfg.ReadConfig()
currNodeCfg.ReadConfig() setSubscriptions(client, &currNodeCfg)
setSubscriptions(client, &currNodeCfg)
}
} }
}) })
opts.SetOrderMatters(true) opts.SetOrderMatters(true)
@@ -270,11 +270,12 @@ func setupMQTT(cfg *config.ClientConfig, publish bool) (mqtt.Client, error) {
opts.SetConnectionLostHandler(func(c mqtt.Client, e error) { opts.SetConnectionLostHandler(func(c mqtt.Client, e error) {
logger.Log(0, "network:", cfg.Node.Network, "detected broker connection lost for", cfg.Server.Server) logger.Log(0, "network:", cfg.Node.Network, "detected broker connection lost for", cfg.Server.Server)
}) })
client := mqtt.NewClient(opts) mqclient = mqtt.NewClient(opts)
log.Println(mqclient)
var connecterr error var connecterr error
for count := 0; count < 3; count++ { for count := 0; count < 3; count++ {
connecterr = nil connecterr = nil
if token := client.Connect(); !token.WaitTimeout(30*time.Second) || token.Error() != nil { if token := mqclient.Connect(); !token.WaitTimeout(30*time.Second) || token.Error() != nil {
logger.Log(0, "unable to connect to broker, retrying ...") logger.Log(0, "unable to connect to broker, retrying ...")
if token.Error() == nil { if token.Error() == nil {
connecterr = errors.New("connect timeout") connecterr = errors.New("connect timeout")
@@ -289,12 +290,12 @@ func setupMQTT(cfg *config.ClientConfig, publish bool) (mqtt.Client, error) {
if connecterr != nil { if connecterr != nil {
reRegisterWithServer(cfg) reRegisterWithServer(cfg)
//try after re-registering //try after re-registering
if token := client.Connect(); !token.WaitTimeout(30*time.Second) || token.Error() != nil { if token := mqclient.Connect(); !token.WaitTimeout(30*time.Second) || token.Error() != nil {
return client, errors.New("unable to connect to broker") return errors.New("unable to connect to broker")
} }
} }
return client, nil return nil
} }
func reRegisterWithServer(cfg *config.ClientConfig) { func reRegisterWithServer(cfg *config.ClientConfig) {

View File

@@ -218,11 +218,6 @@ func JoinNetwork(cfg *config.ClientConfig, privateKey string) error {
if cfg.Server.Server == "" { if cfg.Server.Server == "" {
return errors.New("did not receive broker address from registration") return errors.New("did not receive broker address from registration")
} }
// update server with latest data
if err := PublishNodeUpdate(cfg); err != nil {
logger.Log(0, "network:", cfg.Network, "failed to publish update for join", err.Error())
}
if cfg.Daemon == "install" || ncutils.IsFreeBSD() { if cfg.Daemon == "install" || ncutils.IsFreeBSD() {
err = daemon.InstallDaemon() err = daemon.InstallDaemon()
if err != nil { if err != nil {

View File

@@ -44,7 +44,7 @@ func UpdateLocalListenPort(nodeCfg *config.ClientConfig) error {
return err return err
} }
if err := PublishNodeUpdate(nodeCfg); err != nil { if err := PublishNodeUpdate(nodeCfg); err != nil {
logger.Log(0, "could not publish local port change") logger.Log(0, "could not publish local port change", err.Error())
} }
} }
return err return err

View File

@@ -21,7 +21,8 @@ import (
) )
// Checkin -- go routine that checks for public or local ip changes, publishes changes // Checkin -- go routine that checks for public or local ip changes, publishes changes
// if there are no updates, simply "pings" the server as a checkin //
// if there are no updates, simply "pings" the server as a checkin
func Checkin(ctx context.Context, wg *sync.WaitGroup) { func Checkin(ctx context.Context, wg *sync.WaitGroup) {
logger.Log(2, "starting checkin goroutine") logger.Log(2, "starting checkin goroutine")
defer wg.Done() defer wg.Done()
@@ -141,17 +142,14 @@ func publish(nodeCfg *config.ClientConfig, dest string, msg []byte, qos byte) er
return err return err
} }
client, err := setupMQTT(nodeCfg, true)
if err != nil {
return fmt.Errorf("mq setup error %w", err)
}
defer client.Disconnect(250)
encrypted, err := ncutils.Chunk(msg, serverPubKey, trafficPrivKey) encrypted, err := ncutils.Chunk(msg, serverPubKey, trafficPrivKey)
if err != nil { if err != nil {
return err return err
} }
if mqclient == nil {
if token := client.Publish(dest, qos, false, encrypted); !token.WaitTimeout(30*time.Second) || token.Error() != nil { return errors.New("unable to publish ... no mqclient")
}
if token := mqclient.Publish(dest, qos, false, encrypted); !token.WaitTimeout(30*time.Second) || token.Error() != nil {
logger.Log(0, "could not connect to broker at "+nodeCfg.Server.Server+":"+nodeCfg.Server.MQPort) logger.Log(0, "could not connect to broker at "+nodeCfg.Server.Server+":"+nodeCfg.Server.MQPort)
var err error var err error
if token.Error() == nil { if token.Error() == nil {