mirror of
				https://github.com/gravitl/netmaker.git
				synced 2025-10-30 11:46:28 +08:00 
			
		
		
		
	cleaned up logs, go initial updates working
This commit is contained in:
		| @@ -9,18 +9,9 @@ import ( | ||||
| 	"github.com/gravitl/netmaker/netclient/ncutils" | ||||
| ) | ||||
|  | ||||
| // JoinCommsNetwork -- Join the message queue comms network | ||||
| func JoinCommsNetwork(cfg config.ClientConfig) error { | ||||
| 	if err := functions.JoinNetwork(cfg, "", true); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Join - join command to run from cli | ||||
| func Join(cfg config.ClientConfig, privateKey string) error { | ||||
| 	var err error | ||||
| 	//check if comms network exists | ||||
| // JoinComms -- Join the message queue comms network if it doesn't have it | ||||
| // tries to ping if already found locally, if fail ping pull for best effort for communication | ||||
| func JoinComms(cfg *config.ClientConfig) error { | ||||
| 	var commsCfg config.ClientConfig | ||||
| 	commsCfg.Network = cfg.Server.CommsNetwork | ||||
| 	commsCfg.Node.Network = cfg.Server.CommsNetwork | ||||
| @@ -30,22 +21,26 @@ func Join(cfg config.ClientConfig, privateKey string) error { | ||||
| 	commsCfg.Server.CoreDNSAddr = cfg.Server.CoreDNSAddr | ||||
| 	commsCfg.ReadConfig() | ||||
| 	if commsCfg.Node.Name == "" { | ||||
| 		if err := JoinCommsNetwork(commsCfg); err != nil { | ||||
| 			ncutils.Log("could not join comms network " + err.Error()) | ||||
| 		if err := functions.JoinNetwork(commsCfg, "", true); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} else { // check if comms is currently reachable | ||||
| 		if err := functions.PingServer(&commsCfg); err != nil { | ||||
| 			if err := functions.LeaveNetwork(commsCfg.Network); err != nil { | ||||
| 				ncutils.Log("could not leave comms network " + err.Error()) | ||||
| 				return err | ||||
| 			} | ||||
| 			if err := JoinCommsNetwork(commsCfg); err != nil { | ||||
| 				ncutils.Log("could not join comms network " + err.Error()) | ||||
| 			if err = Pull(commsCfg); err != nil { | ||||
| 				return err | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Join - join command to run from cli | ||||
| func Join(cfg config.ClientConfig, privateKey string) error { | ||||
| 	var err error | ||||
| 	//check if comms network exists | ||||
| 	if err = JoinComms(&cfg); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	//join network | ||||
| 	err = functions.JoinNetwork(cfg, privateKey, false) | ||||
| @@ -147,6 +142,7 @@ func Uninstall() error { | ||||
| 	return err | ||||
| } | ||||
|  | ||||
| // Daemon - runs the daemon | ||||
| func Daemon() error { | ||||
| 	err := functions.Daemon() | ||||
| 	return err | ||||
|   | ||||
| @@ -35,17 +35,7 @@ type cachedMessage struct { | ||||
|  | ||||
| // Daemon runs netclient daemon from command line | ||||
| func Daemon() error { | ||||
| 	commsNetworks, err := getCommsNetworks() | ||||
| 	if err != nil { | ||||
| 		return errors.New("no comm networks exist") | ||||
| 	} | ||||
| 	for net := range commsNetworks { | ||||
| 		ncutils.PrintLog("started comms network daemon, "+net, 1) | ||||
| 		client := setupMQTT(false, net) | ||||
| 		defer client.Disconnect(250) | ||||
| 	} | ||||
| 	wg := sync.WaitGroup{} | ||||
| 	ctx, cancel := context.WithCancel(context.Background()) | ||||
| 	// == initial pull of all networks == | ||||
| 	networks, _ := ncutils.GetSystemNetworks() | ||||
| 	for _, network := range networks { | ||||
| 		var cfg config.ClientConfig | ||||
| @@ -53,37 +43,61 @@ func Daemon() error { | ||||
| 		cfg.ReadConfig() | ||||
| 		initialPull(cfg.Network) | ||||
| 	} | ||||
|  | ||||
| 	// == get all the comms networks on machine == | ||||
| 	commsNetworks, err := getCommsNetworks(networks[:]) | ||||
| 	if err != nil { | ||||
| 		return errors.New("no comm networks exist") | ||||
| 	} | ||||
|  | ||||
| 	// == subscribe to all nodes on each comms network on machine == | ||||
| 	for currCommsNet := range commsNetworks { | ||||
| 		ncutils.PrintLog("started comms network daemon, "+currCommsNet, 1) | ||||
| 		ctx, cancel := context.WithCancel(context.Background()) | ||||
| 		networkcontext.Store(currCommsNet, cancel) | ||||
| 		go messageQueue(ctx, currCommsNet) | ||||
| 	} | ||||
|  | ||||
| 	// == add waitgroup and cancel for checkin routine == | ||||
| 	wg := sync.WaitGroup{} | ||||
| 	ctx, cancel := context.WithCancel(context.Background()) | ||||
| 	wg.Add(1) | ||||
| 	go Checkin(ctx, wg) | ||||
| 	go Checkin(ctx, &wg, commsNetworks) | ||||
| 	quit := make(chan os.Signal, 1) | ||||
| 	signal.Notify(quit, syscall.SIGTERM, os.Interrupt) | ||||
| 	<-quit | ||||
| 	for currCommsNet := range commsNetworks { | ||||
| 		if cancel, ok := networkcontext.Load(currCommsNet); ok { | ||||
| 			cancel.(context.CancelFunc)() | ||||
| 		} | ||||
| 	} | ||||
| 	cancel() | ||||
| 	ncutils.Log("shutting down message queue ") | ||||
| 	ncutils.Log("shutting down netclient daemon") | ||||
| 	wg.Wait() | ||||
| 	ncutils.Log("shutdown complete") | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // UpdateKeys -- updates private key and returns new publickey | ||||
| func UpdateKeys(cfg *config.ClientConfig, client mqtt.Client) error { | ||||
| func UpdateKeys(nodeCfg *config.ClientConfig, client mqtt.Client) error { | ||||
| 	ncutils.Log("received message to update keys") | ||||
| 	key, err := wgtypes.GeneratePrivateKey() | ||||
| 	if err != nil { | ||||
| 		ncutils.Log("error generating privatekey " + err.Error()) | ||||
| 		return err | ||||
| 	} | ||||
| 	file := ncutils.GetNetclientPathSpecific() + cfg.Node.Interface + ".conf" | ||||
| 	file := ncutils.GetNetclientPathSpecific() + nodeCfg.Node.Interface + ".conf" | ||||
| 	if err := wireguard.UpdatePrivateKey(file, key.String()); err != nil { | ||||
| 		ncutils.Log("error updating wireguard key " + err.Error()) | ||||
| 		return err | ||||
| 	} | ||||
| 	cfg.Node.PublicKey = key.PublicKey().String() | ||||
| 	if err := config.ModConfig(&cfg.Node); err != nil { | ||||
| 	nodeCfg.Node.PublicKey = key.PublicKey().String() | ||||
| 	if err := config.ModConfig(&nodeCfg.Node); err != nil { | ||||
| 		ncutils.Log("error updating local config " + err.Error()) | ||||
| 	} | ||||
| 	PublishNodeUpdate(cfg) | ||||
| 	if err = wireguard.ApplyConf(&cfg.Node, cfg.Node.Interface, file); err != nil { | ||||
| 	var commsCfg = getCommsCfgByNode(&nodeCfg.Node) | ||||
| 	PublishNodeUpdate(&commsCfg, nodeCfg) | ||||
| 	if err = wireguard.ApplyConf(&nodeCfg.Node, nodeCfg.Node.Interface, file); err != nil { | ||||
| 		ncutils.Log("error applying new config " + err.Error()) | ||||
| 		return err | ||||
| 	} | ||||
| @@ -91,8 +105,9 @@ func UpdateKeys(cfg *config.ClientConfig, client mqtt.Client) error { | ||||
| } | ||||
|  | ||||
| // PingServer -- checks if server is reachable | ||||
| func PingServer(cfg *config.ClientConfig) error { | ||||
| 	node := getServerAddress(cfg) | ||||
| // use commsCfg only* | ||||
| func PingServer(commsCfg *config.ClientConfig) error { | ||||
| 	node := getServerAddress(commsCfg) | ||||
| 	pinger, err := ping.NewPinger(node) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| @@ -108,16 +123,71 @@ func PingServer(cfg *config.ClientConfig) error { | ||||
|  | ||||
| // == Private == | ||||
|  | ||||
| // sets MQ client subscriptions for a specific node config | ||||
| // should be called for each node belonging to a given comms network | ||||
| func setSubscriptions(client mqtt.Client, nodeCfg *config.ClientConfig) { | ||||
| 	if nodeCfg.DebugOn { | ||||
| 		if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil { | ||||
| 			ncutils.Log(token.Error().Error()) | ||||
| 			return | ||||
| 		} | ||||
| 		ncutils.Log("subscribed to all topics for debugging purposes") | ||||
| 	} | ||||
|  | ||||
| 	if token := client.Subscribe(fmt.Sprintf("update/%s/%s", nodeCfg.Node.Network, nodeCfg.Node.ID), 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil { | ||||
| 		ncutils.Log(token.Error().Error()) | ||||
| 		return | ||||
| 	} | ||||
| 	if nodeCfg.DebugOn { | ||||
| 		ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s", nodeCfg.Node.Name, nodeCfg.Node.Network, nodeCfg.Node.ID)) | ||||
| 	} | ||||
| 	if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", nodeCfg.Node.Network, nodeCfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil { | ||||
| 		ncutils.Log(token.Error().Error()) | ||||
| 		return | ||||
| 	} | ||||
| 	if nodeCfg.DebugOn { | ||||
| 		ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", nodeCfg.Node.Name, nodeCfg.Node.Network, nodeCfg.Node.ID)) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // on a delete usually, pass in the nodecfg to unsubscribe client broker communications | ||||
| // for the node in nodeCfg | ||||
| func unsubscribeNode(client mqtt.Client, nodeCfg *config.ClientConfig) { | ||||
| 	client.Unsubscribe(fmt.Sprintf("update/%s/%s", nodeCfg.Node.Network, nodeCfg.Node.ID)) | ||||
| 	var ok = true | ||||
| 	if token := client.Unsubscribe(fmt.Sprintf("update/%s/%s", nodeCfg.Node.Network, nodeCfg.Node.ID)); token.Wait() && token.Error() != nil { | ||||
| 		ncutils.PrintLog("unable to unsubscribe from updates for node "+nodeCfg.Node.Name+"\n"+token.Error().Error(), 1) | ||||
| 		ok = false | ||||
| 	} | ||||
| 	if token := client.Unsubscribe(fmt.Sprintf("peers/%s/%s", nodeCfg.Node.Network, nodeCfg.Node.ID)); token.Wait() && token.Error() != nil { | ||||
| 		ncutils.PrintLog("unable to unsubscribe from peer updates for node "+nodeCfg.Node.Name+"\n"+token.Error().Error(), 1) | ||||
| 		ok = false | ||||
| 	} | ||||
| 	if ok { | ||||
| 		ncutils.PrintLog("successfully unsubscribed node "+nodeCfg.Node.ID+" : "+nodeCfg.Node.Name, 0) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // sets up Message Queue and subsribes/publishes updates to/from server | ||||
| // the client should subscribe to ALL nodes that exist on unique comms network locally | ||||
| func messageQueue(ctx context.Context, commsNet string) { | ||||
| 	var commsCfg config.ClientConfig | ||||
| 	commsCfg.Network = commsNet | ||||
| 	commsCfg.ReadConfig() | ||||
| 	ncutils.Log("netclient daemon started for network: " + commsNet) | ||||
| 	client := setupMQTT(&commsCfg, false) | ||||
| 	defer client.Disconnect(250) | ||||
| 	<-ctx.Done() | ||||
| 	ncutils.Log("shutting down daemon for comms network " + commsNet) | ||||
| } | ||||
|  | ||||
| // setupMQTT creates a connection to broker and return client | ||||
| func setupMQTT(publish bool, networkName string) mqtt.Client { | ||||
| 	var cfg *config.ClientConfig | ||||
| 	cfg.Network = networkName | ||||
| 	cfg.ReadConfig() | ||||
| // utilizes comms client configs to setup connections | ||||
| func setupMQTT(commsCfg *config.ClientConfig, publish bool) mqtt.Client { | ||||
| 	opts := mqtt.NewClientOptions() | ||||
| 	server := getServerAddress(cfg) | ||||
| 	opts.AddBroker(server + ":1883") // TODO get the appropriate port of the comms mq server | ||||
| 	id := ncutils.MakeRandomString(23) | ||||
| 	opts.ClientID = id | ||||
| 	server := getServerAddress(commsCfg) | ||||
| 	opts.AddBroker(server + ":1883")             // TODO get the appropriate port of the comms mq server | ||||
| 	opts.ClientID = ncutils.MakeRandomString(23) // helps avoid id duplication on broker | ||||
| 	opts.SetDefaultPublishHandler(All) | ||||
| 	opts.SetAutoReconnect(true) | ||||
| 	opts.SetConnectRetry(true) | ||||
| @@ -131,18 +201,18 @@ func setupMQTT(publish bool, networkName string) mqtt.Client { | ||||
| 				ncutils.Log("error retriving networks " + err.Error()) | ||||
| 			} | ||||
| 			for _, network := range networks { | ||||
| 				var currConf config.ClientConfig | ||||
| 				currConf.Network = network | ||||
| 				currConf.ReadConfig() | ||||
| 				SetSubscriptions(client, &currConf) | ||||
| 				var currNodeCfg config.ClientConfig | ||||
| 				currNodeCfg.Network = network | ||||
| 				currNodeCfg.ReadConfig() | ||||
| 				setSubscriptions(client, &currNodeCfg) | ||||
| 			} | ||||
| 		} | ||||
| 	}) | ||||
| 	opts.SetOrderMatters(true) | ||||
| 	opts.SetResumeSubs(true) | ||||
| 	opts.SetConnectionLostHandler(func(c mqtt.Client, e error) { | ||||
| 		ncutils.Log("detected broker connection lost, running pull for " + cfg.Node.Network) | ||||
| 		_, err := Pull(cfg.Node.Network, true) | ||||
| 		ncutils.Log("detected broker connection lost, running pull for " + commsCfg.Node.Network) | ||||
| 		_, err := Pull(commsCfg.Node.Network, true) | ||||
| 		if err != nil { | ||||
| 			ncutils.Log("could not run pull, server unreachable: " + err.Error()) | ||||
| 			ncutils.Log("waiting to retry...") | ||||
| @@ -155,10 +225,10 @@ func setupMQTT(publish bool, networkName string) mqtt.Client { | ||||
| 	for { | ||||
| 		//if after 12 seconds, try a gRPC pull on the last try | ||||
| 		if time.Now().After(tperiod) { | ||||
| 			ncutils.Log("running pull for " + cfg.Node.Network) | ||||
| 			_, err := Pull(cfg.Node.Network, true) | ||||
| 			ncutils.Log("running pull for " + commsCfg.Node.Network) | ||||
| 			_, err := Pull(commsCfg.Node.Network, true) | ||||
| 			if err != nil { | ||||
| 				ncutils.Log("could not run pull, exiting " + cfg.Node.Network + " setup: " + err.Error()) | ||||
| 				ncutils.Log("could not run pull, exiting " + commsCfg.Node.Network + " setup: " + err.Error()) | ||||
| 				return client | ||||
| 			} | ||||
| 			time.Sleep(time.Second) | ||||
| @@ -166,10 +236,10 @@ func setupMQTT(publish bool, networkName string) mqtt.Client { | ||||
| 		if token := client.Connect(); token.Wait() && token.Error() != nil { | ||||
| 			ncutils.Log("unable to connect to broker, retrying ...") | ||||
| 			if time.Now().After(tperiod) { | ||||
| 				ncutils.Log("could not connect to broker, exiting " + cfg.Node.Network + " setup: " + token.Error().Error()) | ||||
| 				ncutils.Log("could not connect to broker, exiting " + commsCfg.Node.Network + " setup: " + token.Error().Error()) | ||||
| 				if strings.Contains(token.Error().Error(), "connectex") || strings.Contains(token.Error().Error(), "i/o timeout") { | ||||
| 					ncutils.PrintLog("connection issue detected.. pulling and restarting daemon", 0) | ||||
| 					Pull(cfg.Node.Network, true) | ||||
| 					Pull(commsCfg.Node.Network, true) | ||||
| 					daemon.Restart() | ||||
| 				} | ||||
| 				return client | ||||
| @@ -182,35 +252,9 @@ func setupMQTT(publish bool, networkName string) mqtt.Client { | ||||
| 	return client | ||||
| } | ||||
|  | ||||
| // SetSubscriptions - sets MQ subscriptions | ||||
| func SetSubscriptions(client mqtt.Client, cfg *config.ClientConfig) { | ||||
| 	if cfg.DebugOn { | ||||
| 		if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil { | ||||
| 			ncutils.Log(token.Error().Error()) | ||||
| 			return | ||||
| 		} | ||||
| 		ncutils.Log("subscribed to all topics for debugging purposes") | ||||
| 	} | ||||
|  | ||||
| 	if token := client.Subscribe(fmt.Sprintf("update/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil { | ||||
| 		ncutils.Log(token.Error().Error()) | ||||
| 		return | ||||
| 	} | ||||
| 	if cfg.DebugOn { | ||||
| 		ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID)) | ||||
| 	} | ||||
| 	if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil { | ||||
| 		ncutils.Log(token.Error().Error()) | ||||
| 		return | ||||
| 	} | ||||
| 	if cfg.DebugOn { | ||||
| 		ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID)) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // publishes a message to server to update peers on this peer's behalf | ||||
| func publishSignal(cfg *config.ClientConfig, signal byte) error { | ||||
| 	if err := publish(cfg, fmt.Sprintf("signal/%s", cfg.Node.ID), []byte{signal}, 1); err != nil { | ||||
| func publishSignal(commsCfg, nodeCfg *config.ClientConfig, signal byte) error { | ||||
| 	if err := publish(commsCfg, nodeCfg, fmt.Sprintf("signal/%s", nodeCfg.Node.ID), []byte{signal}, 1); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	return nil | ||||
| @@ -248,18 +292,19 @@ func parseNetworkFromTopic(topic string) string { | ||||
| 	return strings.Split(topic, "/")[1] | ||||
| } | ||||
|  | ||||
| func decryptMsg(cfg *config.ClientConfig, msg []byte) ([]byte, error) { | ||||
| // should only ever use node client configs | ||||
| func decryptMsg(nodeCfg *config.ClientConfig, msg []byte) ([]byte, error) { | ||||
| 	if len(msg) <= 24 { // make sure message is of appropriate length | ||||
| 		return nil, fmt.Errorf("recieved invalid message from broker %v", msg) | ||||
| 	} | ||||
|  | ||||
| 	// setup the keys | ||||
| 	diskKey, keyErr := auth.RetrieveTrafficKey(cfg.Node.Network) | ||||
| 	diskKey, keyErr := auth.RetrieveTrafficKey(nodeCfg.Node.Network) | ||||
| 	if keyErr != nil { | ||||
| 		return nil, keyErr | ||||
| 	} | ||||
|  | ||||
| 	serverPubKey, err := ncutils.ConvertBytesToKey(cfg.Node.TrafficKeys.Server) | ||||
| 	serverPubKey, err := ncutils.ConvertBytesToKey(nodeCfg.Node.TrafficKeys.Server) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| @@ -277,12 +322,8 @@ func getServerAddress(cfg *config.ClientConfig) string { | ||||
| 	return server.Address | ||||
| } | ||||
|  | ||||
| func getCommsNetworks() (map[string]bool, error) { | ||||
| func getCommsNetworks(networks []string) (map[string]bool, error) { | ||||
| 	var cfg config.ClientConfig | ||||
| 	networks, err := ncutils.GetSystemNetworks() | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	var response = make(map[string]bool, 1) | ||||
| 	for _, network := range networks { | ||||
| 		cfg.Network = network | ||||
| @@ -292,6 +333,13 @@ func getCommsNetworks() (map[string]bool, error) { | ||||
| 	return response, nil | ||||
| } | ||||
|  | ||||
| func getCommsCfgByNode(node *models.Node) config.ClientConfig { | ||||
| 	var commsCfg config.ClientConfig | ||||
| 	commsCfg.Network = node.Network | ||||
| 	commsCfg.ReadConfig() | ||||
| 	return commsCfg | ||||
| } | ||||
|  | ||||
| // == Message Caches == | ||||
|  | ||||
| func insert(network, which, cache string) { | ||||
|   | ||||
| @@ -242,7 +242,8 @@ func JoinNetwork(cfg config.ClientConfig, privateKey string, iscomms bool) error | ||||
| 				go func() { | ||||
| 					if !local.SetDNSWithRetry(node, server.Address) { | ||||
| 						cfg.Node.DNSOn = "no" | ||||
| 						PublishNodeUpdate(&cfg) | ||||
| 						var currentCommsCfg = getCommsCfgByNode(&cfg.Node) | ||||
| 						PublishNodeUpdate(¤tCommsCfg, &cfg) | ||||
| 					} | ||||
| 				}() | ||||
| 				break | ||||
|   | ||||
| @@ -1,14 +1,12 @@ | ||||
| package functions | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"runtime" | ||||
| 	"strings" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/davecgh/go-spew/spew" | ||||
| 	mqtt "github.com/eclipse/paho.mqtt.golang" | ||||
| 	"github.com/gravitl/netmaker/models" | ||||
| 	"github.com/gravitl/netmaker/netclient/config" | ||||
| @@ -27,12 +25,13 @@ var All mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { | ||||
| // NodeUpdate -- mqtt message handler for /update/<NodeID> topic | ||||
| func NodeUpdate(client mqtt.Client, msg mqtt.Message) { | ||||
| 	var newNode models.Node | ||||
| 	var cfg config.ClientConfig | ||||
| 	var nodeCfg config.ClientConfig | ||||
| 	var network = parseNetworkFromTopic(msg.Topic()) | ||||
| 	cfg.Network = network | ||||
| 	cfg.ReadConfig() | ||||
| 	nodeCfg.Network = network | ||||
| 	nodeCfg.ReadConfig() | ||||
| 	var commsCfg = getCommsCfgByNode(&nodeCfg.Node) | ||||
|  | ||||
| 	data, dataErr := decryptMsg(&cfg, msg.Payload()) | ||||
| 	data, dataErr := decryptMsg(&nodeCfg, msg.Payload()) | ||||
| 	if dataErr != nil { | ||||
| 		return | ||||
| 	} | ||||
| @@ -50,70 +49,58 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) { | ||||
| 	} | ||||
| 	insert(newNode.Network, lastNodeUpdate, string(data)) // store new message in cache | ||||
|  | ||||
| 	//check if interface name has changed if so delete. | ||||
| 	if cfg.Node.Interface != newNode.Interface { | ||||
| 		if err = wireguard.RemoveConf(cfg.Node.Interface, true); err != nil { | ||||
| 			ncutils.PrintLog("could not delete old interface "+cfg.Node.Interface+": "+err.Error(), 0) | ||||
| 		} | ||||
| 	} | ||||
| 	//ensure that OS never changes | ||||
| 	// ensure that OS never changes | ||||
| 	newNode.OS = runtime.GOOS | ||||
| 	// check if interface needs to delta | ||||
| 	ifaceDelta := ncutils.IfaceDelta(&cfg.Node, &newNode) | ||||
| 	shouldDNSChange := cfg.Node.DNSOn != newNode.DNSOn | ||||
| 	ifaceDelta := ncutils.IfaceDelta(&nodeCfg.Node, &newNode) | ||||
| 	shouldDNSChange := nodeCfg.Node.DNSOn != newNode.DNSOn | ||||
|  | ||||
| 	cfg.Node = newNode | ||||
| 	nodeCfg.Node = newNode | ||||
| 	switch newNode.Action { | ||||
| 	case models.NODE_DELETE: | ||||
| 		if cancel, ok := networkcontext.Load(newNode.Network); ok { | ||||
| 			ncutils.Log("cancelling message queue context for " + newNode.Network) | ||||
| 			cancel.(context.CancelFunc)() | ||||
| 		} else { | ||||
| 			ncutils.Log("failed to kill go routines for network " + newNode.Network) | ||||
| 		} | ||||
| 		ncutils.PrintLog(fmt.Sprintf("received delete request for %s", cfg.Node.Name), 0) | ||||
| 		if err = LeaveNetwork(cfg.Node.Network); err != nil { | ||||
| 		ncutils.PrintLog(fmt.Sprintf("received delete request for %s", nodeCfg.Node.Name), 0) | ||||
| 		unsubscribeNode(client, &nodeCfg) | ||||
| 		if err = LeaveNetwork(nodeCfg.Node.Network); err != nil { | ||||
| 			if !strings.Contains("rpc error", err.Error()) { | ||||
| 				ncutils.PrintLog(fmt.Sprintf("failed to leave, please check that local files for network %s were removed", cfg.Node.Network), 0) | ||||
| 				ncutils.PrintLog(fmt.Sprintf("failed to leave, please check that local files for network %s were removed", nodeCfg.Node.Network), 0) | ||||
| 				return | ||||
| 			} | ||||
| 			ncutils.PrintLog(fmt.Sprintf("%s was removed", cfg.Node.Name), 0) | ||||
| 			return | ||||
| 		} | ||||
| 		ncutils.PrintLog(fmt.Sprintf("%s was removed", cfg.Node.Name), 0) | ||||
| 		ncutils.PrintLog(fmt.Sprintf("%s was removed", nodeCfg.Node.Name), 0) | ||||
| 		return | ||||
| 	case models.NODE_UPDATE_KEY: | ||||
| 		if err := UpdateKeys(&cfg, client); err != nil { | ||||
| 		if err := UpdateKeys(&nodeCfg, client); err != nil { | ||||
| 			ncutils.PrintLog("err updating wireguard keys: "+err.Error(), 0) | ||||
| 		} | ||||
| 	case models.NODE_NOOP: | ||||
| 	default: | ||||
| 	} | ||||
| 	// Save new config | ||||
| 	cfg.Node.Action = models.NODE_NOOP | ||||
| 	if err := config.Write(&cfg, cfg.Network); err != nil { | ||||
| 	nodeCfg.Node.Action = models.NODE_NOOP | ||||
| 	if err := config.Write(&nodeCfg, nodeCfg.Network); err != nil { | ||||
| 		ncutils.PrintLog("error updating node configuration: "+err.Error(), 0) | ||||
| 	} | ||||
| 	nameserver := cfg.Server.CoreDNSAddr | ||||
| 	nameserver := nodeCfg.Server.CoreDNSAddr | ||||
| 	privateKey, err := wireguard.RetrievePrivKey(newNode.Network) | ||||
| 	if err != nil { | ||||
| 		ncutils.Log("error reading PrivateKey " + err.Error()) | ||||
| 		return | ||||
| 	} | ||||
| 	file := ncutils.GetNetclientPathSpecific() + cfg.Node.Interface + ".conf" | ||||
| 	file := ncutils.GetNetclientPathSpecific() + nodeCfg.Node.Interface + ".conf" | ||||
|  | ||||
| 	if err := wireguard.UpdateWgInterface(file, privateKey, nameserver, newNode); err != nil { | ||||
| 		ncutils.Log("error updating wireguard config " + err.Error()) | ||||
| 		return | ||||
| 	} | ||||
| 	if ifaceDelta { // if a change caused an ifacedelta we need to notify the server to update the peers | ||||
| 		ackErr := publishSignal(&cfg, ncutils.ACK) | ||||
| 		ackErr := publishSignal(&commsCfg, &nodeCfg, ncutils.ACK) | ||||
| 		if ackErr != nil { | ||||
| 			ncutils.Log("could not notify server that it received an interface update") | ||||
| 		} else { | ||||
| 			ncutils.Log("signalled acknowledgement of change to server") | ||||
| 		} | ||||
| 		ncutils.Log("applying WG conf to " + file) | ||||
| 		err = wireguard.ApplyConf(&cfg.Node, cfg.Node.Interface, file) | ||||
| 		err = wireguard.ApplyConf(&nodeCfg.Node, nodeCfg.Node.Interface, file) | ||||
| 		if err != nil { | ||||
| 			ncutils.Log("error restarting wg after node update " + err.Error()) | ||||
| 			return | ||||
| @@ -128,17 +115,17 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) { | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 		doneErr := publishSignal(&cfg, ncutils.DONE) | ||||
| 		doneErr := publishSignal(&commsCfg, &nodeCfg, ncutils.DONE) | ||||
| 		if doneErr != nil { | ||||
| 			ncutils.Log("could not notify server to update peers after interface change") | ||||
| 		} else { | ||||
| 			ncutils.Log("signalled finshed interface update to server") | ||||
| 			ncutils.Log("signalled finished interface update to server") | ||||
| 		} | ||||
| 	} | ||||
| 	//deal with DNS | ||||
| 	if newNode.DNSOn != "yes" && shouldDNSChange && cfg.Node.Interface != "" { | ||||
| 	if newNode.DNSOn != "yes" && shouldDNSChange && nodeCfg.Node.Interface != "" { | ||||
| 		ncutils.Log("settng DNS off") | ||||
| 		_, err := ncutils.RunCmd("/usr/bin/resolvectl revert "+cfg.Node.Interface, true) | ||||
| 		_, err := ncutils.RunCmd("/usr/bin/resolvectl revert "+nodeCfg.Node.Interface, true) | ||||
| 		if err != nil { | ||||
| 			ncutils.Log("error applying dns" + err.Error()) | ||||
| 		} | ||||
| @@ -170,7 +157,6 @@ func UpdatePeers(client mqtt.Client, msg mqtt.Message) { | ||||
| 	insert(peerUpdate.Network, lastPeerUpdate, string(data)) | ||||
|  | ||||
| 	file := ncutils.GetNetclientPathSpecific() + cfg.Node.Interface + ".conf" | ||||
| 	spew.Dump(peerUpdate.Peers) | ||||
| 	err = wireguard.UpdateWgPeers(file, peerUpdate.Peers) | ||||
| 	if err != nil { | ||||
| 		ncutils.Log("error updating wireguard peers" + err.Error()) | ||||
|   | ||||
| @@ -14,12 +14,12 @@ import ( | ||||
|  | ||||
| // Checkin  -- go routine that checks for public or local ip changes, publishes changes | ||||
| //   if there are no updates, simply "pings" the server as a checkin | ||||
| func Checkin(ctx context.Context, wg sync.WaitGroup) { | ||||
| func Checkin(ctx context.Context, wg *sync.WaitGroup, currentComms map[string]bool) { | ||||
| 	defer wg.Done() | ||||
| 	for { | ||||
| 		select { | ||||
| 		case <-ctx.Done(): | ||||
| 			ncutils.Log("Checkin cancelled") | ||||
| 			ncutils.Log("checkin routine closed") | ||||
| 			return | ||||
| 			//delay should be configuraable -> use cfg.Node.NetworkSettings.DefaultCheckInInterval ?? | ||||
| 		case <-time.After(time.Second * 60): | ||||
| @@ -29,99 +29,106 @@ func Checkin(ctx context.Context, wg sync.WaitGroup) { | ||||
| 			if err != nil { | ||||
| 				return | ||||
| 			} | ||||
| 			for _, network := range networks { | ||||
| 				if network == ncutils.COMMS_NETWORK_NAME { | ||||
| 					continue | ||||
| 				} | ||||
| 				var cfg *config.ClientConfig | ||||
| 				cfg.Network = network | ||||
| 				cfg.ReadConfig() | ||||
| 				if cfg.Node.IsStatic != "yes" { | ||||
| 					extIP, err := ncutils.GetPublicIP() | ||||
| 					if err != nil { | ||||
| 						ncutils.PrintLog("error encountered checking public ip addresses: "+err.Error(), 1) | ||||
| 			for commsNet := range currentComms { | ||||
| 				var currCommsCfg config.ClientConfig | ||||
| 				currCommsCfg.Network = commsNet | ||||
| 				currCommsCfg.ReadConfig() | ||||
| 				for _, network := range networks { | ||||
| 					var nodeCfg config.ClientConfig | ||||
| 					nodeCfg.Network = network | ||||
| 					nodeCfg.ReadConfig() | ||||
| 					if nodeCfg.Node.CommID != commsNet { | ||||
| 						continue // skip if not on current comms network | ||||
| 					} | ||||
| 					if cfg.Node.Endpoint != extIP && extIP != "" { | ||||
| 						ncutils.PrintLog("endpoint has changed from "+cfg.Node.Endpoint+" to "+extIP, 1) | ||||
| 						cfg.Node.Endpoint = extIP | ||||
| 						if err := PublishNodeUpdate(cfg); err != nil { | ||||
| 							ncutils.Log("could not publish endpoint change") | ||||
| 					if nodeCfg.Node.IsStatic != "yes" { | ||||
| 						extIP, err := ncutils.GetPublicIP() | ||||
| 						if err != nil { | ||||
| 							ncutils.PrintLog("error encountered checking public ip addresses: "+err.Error(), 1) | ||||
| 						} | ||||
| 						if nodeCfg.Node.Endpoint != extIP && extIP != "" { | ||||
| 							ncutils.PrintLog("endpoint has changed from "+nodeCfg.Node.Endpoint+" to "+extIP, 1) | ||||
| 							nodeCfg.Node.Endpoint = extIP | ||||
| 							if err := PublishNodeUpdate(&currCommsCfg, &nodeCfg); err != nil { | ||||
| 								ncutils.Log("could not publish endpoint change") | ||||
| 							} | ||||
| 						} | ||||
| 						intIP, err := getPrivateAddr() | ||||
| 						if err != nil { | ||||
| 							ncutils.PrintLog("error encountered checking private ip addresses: "+err.Error(), 1) | ||||
| 						} | ||||
| 						if nodeCfg.Node.LocalAddress != intIP && intIP != "" { | ||||
| 							ncutils.PrintLog("local Address has changed from "+nodeCfg.Node.LocalAddress+" to "+intIP, 1) | ||||
| 							nodeCfg.Node.LocalAddress = intIP | ||||
| 							if err := PublishNodeUpdate(&currCommsCfg, &nodeCfg); err != nil { | ||||
| 								ncutils.Log("could not publish local address change") | ||||
| 							} | ||||
| 						} | ||||
| 					} else if nodeCfg.Node.IsLocal == "yes" && nodeCfg.Node.LocalRange != "" { | ||||
| 						localIP, err := ncutils.GetLocalIP(nodeCfg.Node.LocalRange) | ||||
| 						if err != nil { | ||||
| 							ncutils.PrintLog("error encountered checking local ip addresses: "+err.Error(), 1) | ||||
| 						} | ||||
| 						if nodeCfg.Node.Endpoint != localIP && localIP != "" { | ||||
| 							ncutils.PrintLog("endpoint has changed from "+nodeCfg.Node.Endpoint+" to "+localIP, 1) | ||||
| 							nodeCfg.Node.Endpoint = localIP | ||||
| 							if err := PublishNodeUpdate(&currCommsCfg, &nodeCfg); err != nil { | ||||
| 								ncutils.Log("could not publish localip change") | ||||
| 							} | ||||
| 						} | ||||
| 					} | ||||
| 					intIP, err := getPrivateAddr() | ||||
| 					if err != nil { | ||||
| 						ncutils.PrintLog("error encountered checking private ip addresses: "+err.Error(), 1) | ||||
| 					} | ||||
| 					if cfg.Node.LocalAddress != intIP && intIP != "" { | ||||
| 						ncutils.PrintLog("local Address has changed from "+cfg.Node.LocalAddress+" to "+intIP, 1) | ||||
| 						cfg.Node.LocalAddress = intIP | ||||
| 						if err := PublishNodeUpdate(cfg); err != nil { | ||||
| 							ncutils.Log("could not publish local address change") | ||||
| 						} | ||||
| 					} | ||||
| 				} else if cfg.Node.IsLocal == "yes" && cfg.Node.LocalRange != "" { | ||||
| 					localIP, err := ncutils.GetLocalIP(cfg.Node.LocalRange) | ||||
| 					if err != nil { | ||||
| 						ncutils.PrintLog("error encountered checking local ip addresses: "+err.Error(), 1) | ||||
| 					} | ||||
| 					if cfg.Node.Endpoint != localIP && localIP != "" { | ||||
| 						ncutils.PrintLog("endpoint has changed from "+cfg.Node.Endpoint+" to "+localIP, 1) | ||||
| 						cfg.Node.Endpoint = localIP | ||||
| 						if err := PublishNodeUpdate(cfg); err != nil { | ||||
| 							ncutils.Log("could not publish localip change") | ||||
| 						} | ||||
| 					if err := PingServer(&currCommsCfg); err != nil { | ||||
| 						ncutils.PrintLog("could not ping server on comms net, "+currCommsCfg.Network+"\n"+err.Error(), 0) | ||||
| 					} else { | ||||
| 						Hello(&currCommsCfg, &nodeCfg) | ||||
| 					} | ||||
| 				} | ||||
| 				if err := PingServer(cfg); err != nil { | ||||
| 					ncutils.PrintLog("could not ping server "+err.Error(), 0) | ||||
| 				} | ||||
| 				Hello(cfg, network) | ||||
| 				// ncutils.Log("Checkin complete") | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // PublishNodeUpdates -- saves node and pushes changes to broker | ||||
| func PublishNodeUpdate(cfg *config.ClientConfig) error { | ||||
| 	if err := config.Write(cfg, cfg.Network); err != nil { | ||||
| func PublishNodeUpdate(commsCfg, nodeCfg *config.ClientConfig) error { | ||||
| 	if err := config.Write(nodeCfg, nodeCfg.Network); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	data, err := json.Marshal(cfg.Node) | ||||
| 	data, err := json.Marshal(nodeCfg.Node) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	if err = publish(cfg, fmt.Sprintf("update/%s", cfg.Node.ID), data, 1); err != nil { | ||||
| 	if err = publish(commsCfg, nodeCfg, fmt.Sprintf("update/%s", nodeCfg.Node.ID), data, 1); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Hello -- ping the broker to let server know node is alive and doing fine | ||||
| func Hello(cfg *config.ClientConfig, network string) { | ||||
| 	if err := publish(cfg, fmt.Sprintf("ping/%s", cfg.Node.ID), []byte(ncutils.Version), 0); err != nil { | ||||
| // Hello -- ping the broker to let server know node it's alive and well | ||||
| func Hello(commsCfg, nodeCfg *config.ClientConfig) { | ||||
| 	if err := publish(commsCfg, nodeCfg, fmt.Sprintf("ping/%s", nodeCfg.Node.ID), []byte(ncutils.Version), 0); err != nil { | ||||
| 		ncutils.Log(fmt.Sprintf("error publishing ping, %v", err)) | ||||
| 		ncutils.Log("running pull on " + cfg.Node.Network + " to reconnect") | ||||
| 		_, err := Pull(cfg.Node.Network, true) | ||||
| 		ncutils.Log("running pull on " + commsCfg.Node.Network + " to reconnect") | ||||
| 		_, err := Pull(commsCfg.Node.Network, true) | ||||
| 		if err != nil { | ||||
| 			ncutils.Log("could not run pull on " + cfg.Node.Network + ", error: " + err.Error()) | ||||
| 			ncutils.Log("could not run pull on " + commsCfg.Node.Network + ", error: " + err.Error()) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func publish(cfg *config.ClientConfig, dest string, msg []byte, qos byte) error { | ||||
| // requires the commscfg in which to send traffic over and nodecfg of node that is publish the message | ||||
| // node cfg is so that the traffic keys of that node may be fetched for encryption | ||||
| func publish(commsCfg, nodeCfg *config.ClientConfig, dest string, msg []byte, qos byte) error { | ||||
| 	// setup the keys | ||||
| 	trafficPrivKey, err := auth.RetrieveTrafficKey(cfg.Node.Network) | ||||
| 	trafficPrivKey, err := auth.RetrieveTrafficKey(nodeCfg.Node.Network) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	serverPubKey, err := ncutils.ConvertBytesToKey(cfg.Node.TrafficKeys.Server) | ||||
| 	serverPubKey, err := ncutils.ConvertBytesToKey(nodeCfg.Node.TrafficKeys.Server) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	client := setupMQTT(true, cfg.Node.CommID) | ||||
| 	client := setupMQTT(commsCfg, true) | ||||
| 	defer client.Disconnect(250) | ||||
| 	encrypted, err := ncutils.Chunk(msg, serverPubKey, trafficPrivKey) | ||||
| 	if err != nil { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 0xdcarns
					0xdcarns