diff --git a/netclient/functions/common.go b/netclient/functions/common.go index b6d4f397..1b551564 100644 --- a/netclient/functions/common.go +++ b/netclient/functions/common.go @@ -303,8 +303,8 @@ func WipeLocal(network string) error { log.Println(err.Error()) } } - if ncutils.FileExists(home + "nm-" + network + ".conf") { - err = os.Remove(home + "nm-" + network + ".conf") + if ncutils.FileExists(home + ifacename + ".conf") { + err = os.Remove(home + ifacename + ".conf") if err != nil { log.Println("error removing .conf:") log.Println(err.Error()) diff --git a/netclient/functions/daemon.go b/netclient/functions/daemon.go index 46fd9dd6..30d3c52d 100644 --- a/netclient/functions/daemon.go +++ b/netclient/functions/daemon.go @@ -28,6 +28,7 @@ import ( // == Message Caches == var keepalive = new(sync.Map) var messageCache = new(sync.Map) +var networkcontext = new(sync.Map) const lastNodeUpdate = "lnu" const lastPeerUpdate = "lpu" @@ -65,21 +66,26 @@ func read(network, which string) string { // Daemon runs netclient daemon from command line func Daemon() error { - ctx, cancel := context.WithCancel(context.Background()) networks, err := ncutils.GetSystemNetworks() if err != nil { - cancel() return err } for _, network := range networks { + ctx, cancel := context.WithCancel(context.Background()) + networkcontext.Store(network, cancel) go MessageQueue(ctx, network) } quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGTERM, os.Interrupt) <-quit - cancel() + for _, network := range networks { + if cancel, ok := networkcontext.Load(network); ok { + cancel.(context.CancelFunc)() + } + } ncutils.Log("all done") return nil + } // SetupMQTT creates a connection to broker and return client @@ -87,7 +93,6 @@ func SetupMQTT(cfg *config.ClientConfig) mqtt.Client { opts := mqtt.NewClientOptions() server := getServerAddress(cfg) opts.AddBroker(server + ":1883") - opts.SetDefaultPublishHandler(All) client := mqtt.NewClient(opts) tperiod := time.Now().Add(12 * time.Second) @@ -197,10 +202,18 @@ func MessageQueue(ctx context.Context, network string) { ncutils.Log("leader not defined for network " + cfg.Node.Network) } defer client.Disconnect(250) - go MonitorKeepalive(ctx, client, &cfg) - go Checkin(ctx, &cfg, network) + wg := &sync.WaitGroup{} + wg.Add(2) + keepalivectx, keepalivecancel := context.WithCancel(context.Background()) + go MonitorKeepalive(keepalivectx, wg, client, &cfg) + checkinctx, checkincancel := context.WithCancel(context.Background()) + go Checkin(checkinctx, wg, &cfg, network) <-ctx.Done() - ncutils.Log("shutting down daemon") + keepalivecancel() + checkincancel() + ncutils.Log("shutting down message queue for network " + network) + wg.Wait() + ncutils.Log("shutdown complete") } // All -- mqtt message hander for all ('#') topics @@ -254,10 +267,13 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) { cfg.Node = newNode switch newNode.Action { case models.NODE_DELETE: - ncutils.Log("delete recieved") - if token := client.Unsubscribe(fmt.Sprintf("update/%s/%s", newNode.Network, newNode.ID), fmt.Sprintf("peers/%s/%s", newNode.Network, newNode.ID)); token.Wait() && token.Error() != nil { - ncutils.PrintLog("error unsubscribing during node deletion", 1) + if cancel, ok := networkcontext.Load(newNode.Network); ok { + ncutils.Log("cancelling message queue context for " + newNode.Network) + cancel.(context.CancelFunc)() + } else { + ncutils.Log("failed to kill go routines for network " + newNode.Network) } + ncutils.Log("deleting configuration files") if err := WipeLocal(cfg.Network); err != nil { ncutils.PrintLog("error deleting local instance: "+err.Error(), 1) ncutils.PrintLog("Please perform manual clean up", 1) @@ -298,7 +314,7 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) { } if ifaceDelta { ncutils.Log("applying WG conf to " + file) - err = wireguard.ApplyWGQuickConf(file, cfg.Node.Interface) + err = wireguard.ApplyConf(&cfg.Node, cfg.Node.Interface, file) if err != nil { ncutils.Log("error restarting wg after node update " + err.Error()) return @@ -370,10 +386,12 @@ func UpdatePeers(client mqtt.Client, msg mqtt.Message) { } // MonitorKeepalive - checks time last server keepalive received. If more than 3+ minutes, notify and resubscribe -func MonitorKeepalive(ctx context.Context, client mqtt.Client, cfg *config.ClientConfig) { +func MonitorKeepalive(ctx context.Context, wg *sync.WaitGroup, client mqtt.Client, cfg *config.ClientConfig) { + defer wg.Done() for { select { case <-ctx.Done(): + ncutils.Log("cancel recieved, monitor keepalive exiting") return case <-time.After(time.Second * 150): var keepalivetime time.Time @@ -467,7 +485,8 @@ func UpdateKeys(cfg *config.ClientConfig, client mqtt.Client) error { // Checkin -- go routine that checks for public or local ip changes, publishes changes // if there are no updates, simply "pings" the server as a checkin -func Checkin(ctx context.Context, cfg *config.ClientConfig, network string) { +func Checkin(ctx context.Context, wg *sync.WaitGroup, cfg *config.ClientConfig, network string) { + defer wg.Done() for { select { case <-ctx.Done():