refactor to reduce number of goroutines

This commit is contained in:
Matthew R. Kasun
2022-02-18 14:56:26 -05:00
parent 52ac4c3bc3
commit c7cf5fb2fb
3 changed files with 106 additions and 103 deletions

View File

@@ -35,50 +35,27 @@ type cachedMessage struct {
// Daemon runs netclient daemon from command line // Daemon runs netclient daemon from command line
func Daemon() error { func Daemon() error {
networks, err := ncutils.GetSystemNetworks() client := setupMQTT(false)
if err != nil { defer client.Disconnect(250)
return err wg := sync.WaitGroup{}
}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
go func(context.Context) { networks, _ := ncutils.GetSystemNetworks()
for _, network := range networks { for _, network := range networks {
//skip comms network var cfg config.ClientConfig
if network == ncutils.COMMS_NETWORK_NAME { cfg.Network = network
continue cfg.ReadConfig()
initialPull(cfg.Network)
} }
MessageQueue(ctx, network) wg.Add(1)
} go Checkin(ctx, wg)
}(ctx)
quit := make(chan os.Signal, 1) quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGTERM, os.Interrupt) signal.Notify(quit, syscall.SIGTERM, os.Interrupt)
<-quit <-quit
cancel() cancel()
ncutils.Log("all done") ncutils.Log("shutting down message queue ")
return nil
}
// MessageQueue sets up Message Queue and subsribes/publishes updates to/from server
func MessageQueue(ctx context.Context, network string) {
ncutils.Log("netclient go routine started for " + network)
var cfg config.ClientConfig
cfg.Network = network
initialPull(cfg.Network)
cfg.ReadConfig()
ncutils.Log("daemon started for network: " + network)
client := setupMQTT(false)
defer client.Disconnect(250)
wg := &sync.WaitGroup{}
wg.Add(2)
checkinctx, checkincancel := context.WithCancel(context.Background())
go Checkin(checkinctx, wg, &cfg, network)
<-ctx.Done()
checkincancel()
ncutils.Log("shutting down message queue for network " + network)
wg.Wait() wg.Wait()
ncutils.Log("shutdown complete") ncutils.Log("shutdown complete")
return nil
} }
// UpdateKeys -- updates private key and returns new publickey // UpdateKeys -- updates private key and returns new publickey
@@ -142,31 +119,11 @@ func setupMQTT(publish bool) mqtt.Client {
opts.SetWriteTimeout(time.Minute) opts.SetWriteTimeout(time.Minute)
opts.SetOnConnectHandler(func(client mqtt.Client) { opts.SetOnConnectHandler(func(client mqtt.Client) {
if !publish { if !publish {
if cfg.DebugOn { SetSubscriptions(client, cfg)
if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil {
ncutils.Log(token.Error().Error())
return
}
ncutils.Log("subscribed to all topics for debugging purposes")
}
if token := client.Subscribe(fmt.Sprintf("update/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil {
ncutils.Log(token.Error().Error())
return
}
if cfg.DebugOn {
ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
}
if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil {
ncutils.Log(token.Error().Error())
return
}
if cfg.DebugOn {
ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
}
opts.SetOrderMatters(true)
opts.SetResumeSubs(true)
} }
}) })
opts.SetOrderMatters(true)
opts.SetResumeSubs(true)
opts.SetConnectionLostHandler(func(c mqtt.Client, e error) { opts.SetConnectionLostHandler(func(c mqtt.Client, e error) {
ncutils.Log("detected broker connection lost, running pull for " + cfg.Node.Network) ncutils.Log("detected broker connection lost, running pull for " + cfg.Node.Network)
_, err := Pull(cfg.Node.Network, true) _, err := Pull(cfg.Node.Network, true)
@@ -215,6 +172,41 @@ func setupMQTT(publish bool) mqtt.Client {
return client return client
} }
// SetSubscriptions - sets MQ subscriptions
func SetSubscriptions(client mqtt.Client, cfg *config.ClientConfig) {
if cfg.DebugOn {
if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil {
ncutils.Log(token.Error().Error())
return
}
ncutils.Log("subscribed to all topics for debugging purposes")
}
networks, err := ncutils.GetSystemNetworks()
if err != nil {
ncutils.Log("error retriving networks " + err.Error())
}
for _, network := range networks {
var cfg config.ClientConfig
cfg.Network = network
cfg.ReadConfig()
if token := client.Subscribe(fmt.Sprintf("update/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil {
ncutils.Log(token.Error().Error())
return
}
if cfg.DebugOn {
ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
}
if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil {
ncutils.Log(token.Error().Error())
return
}
if cfg.DebugOn {
ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
}
}
}
// publishes a message to server to update peers on this peer's behalf // publishes a message to server to update peers on this peer's behalf
func publishSignal(cfg *config.ClientConfig, signal byte) error { func publishSignal(cfg *config.ClientConfig, signal byte) error {
if err := publish(cfg, fmt.Sprintf("signal/%s", cfg.Node.ID), []byte{signal}, 1); err != nil { if err := publish(cfg, fmt.Sprintf("signal/%s", cfg.Node.ID), []byte{signal}, 1); err != nil {

View File

@@ -14,7 +14,7 @@ import (
// Checkin -- go routine that checks for public or local ip changes, publishes changes // Checkin -- go routine that checks for public or local ip changes, publishes changes
// if there are no updates, simply "pings" the server as a checkin // if there are no updates, simply "pings" the server as a checkin
func Checkin(ctx context.Context, wg *sync.WaitGroup, cfg *config.ClientConfig, network string) { func Checkin(ctx context.Context, wg sync.WaitGroup) {
defer wg.Done() defer wg.Done()
for { for {
select { select {
@@ -25,6 +25,16 @@ func Checkin(ctx context.Context, wg *sync.WaitGroup, cfg *config.ClientConfig,
case <-time.After(time.Second * 60): case <-time.After(time.Second * 60):
// ncutils.Log("Checkin running") // ncutils.Log("Checkin running")
//read latest config //read latest config
networks, err := ncutils.GetSystemNetworks()
if err != nil {
return
}
for _, network := range networks {
if network == ncutils.COMMS_NETWORK_NAME {
continue
}
var cfg *config.ClientConfig
cfg.Network = network
cfg.ReadConfig() cfg.ReadConfig()
if cfg.Node.IsStatic != "yes" { if cfg.Node.IsStatic != "yes" {
extIP, err := ncutils.GetPublicIP() extIP, err := ncutils.GetPublicIP()
@@ -70,6 +80,7 @@ func Checkin(ctx context.Context, wg *sync.WaitGroup, cfg *config.ClientConfig,
} }
} }
} }
}
// PublishNodeUpdates -- saves node and pushes changes to broker // PublishNodeUpdates -- saves node and pushes changes to broker
func PublishNodeUpdate(cfg *config.ClientConfig) error { func PublishNodeUpdate(cfg *config.ClientConfig) error {

View File

@@ -7,6 +7,6 @@ const (
DONE = 2 DONE = 2
// KEY - key update completed signal for MQ // KEY - key update completed signal for MQ
KEY = 3 KEY = 3
// COMMS_NETWORK_NAME - name of signalling network
COMMS_NETWORK_NAME = "n37m8k3r"
) )
const COMMS_NETWORK_NAME = "n37m8k3r"