mirror of
				https://github.com/gravitl/netmaker.git
				synced 2025-10-31 20:22:44 +08:00 
			
		
		
		
	Replaced ### with - in Node.ID (local copy) to fix mqtt publishing
errors added context to all go routines removed connectivity function
This commit is contained in:
		| @@ -1,18 +1,22 @@ | |||||||
| package functions | package functions | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
|  | 	"context" | ||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
| 	"fmt" |  | ||||||
| 	"log" | 	"log" | ||||||
|  | 	"os" | ||||||
|  | 	"os/signal" | ||||||
|  | 	"strings" | ||||||
|  | 	"syscall" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	mqtt "github.com/eclipse/paho.mqtt.golang" | 	mqtt "github.com/eclipse/paho.mqtt.golang" | ||||||
| 	"github.com/go-ping/ping" |  | ||||||
| 	"github.com/gravitl/netmaker/netclient/config" | 	"github.com/gravitl/netmaker/netclient/config" | ||||||
| 	"github.com/gravitl/netmaker/netclient/ncutils" | 	"github.com/gravitl/netmaker/netclient/ncutils" | ||||||
| 	"golang.zx2c4.com/wireguard/wgctrl" | 	"golang.zx2c4.com/wireguard/wgctrl" | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | //Daemon runs netclient daemon from command line | ||||||
| func Daemon() error { | func Daemon() error { | ||||||
| 	networks, err := ncutils.GetSystemNetworks() | 	networks, err := ncutils.GetSystemNetworks() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| @@ -26,59 +30,75 @@ func Daemon() error { | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func Netclient(network string) { | //SetupMQTT creates a connection to broker and return client | ||||||
| 	var cfg config.ClientConfig | func SetupMQTT(cfg config.ClientConfig) mqtt.Client { | ||||||
| 	cfg.Network = network |  | ||||||
| 	cfg.ReadConfig() |  | ||||||
| 	ncutils.Log("daemon started for network:" + network) |  | ||||||
| 	//setup MQTT |  | ||||||
| 	opts := mqtt.NewClientOptions() | 	opts := mqtt.NewClientOptions() | ||||||
| 	ncutils.Log("setting broker to " + cfg.Server.CoreDNSAddr + ":1883") | 	ncutils.Log("setting broker to " + cfg.Server.CoreDNSAddr + ":1883") | ||||||
| 	opts.AddBroker(cfg.Server.CoreDNSAddr + ":1883") | 	opts.AddBroker(cfg.Server.CoreDNSAddr + ":1883") | ||||||
| 	opts.SetDefaultPublishHandler(All) | 	opts.SetDefaultPublishHandler(All) | ||||||
| 	opts.SetClientID("netclient-mqtt") |  | ||||||
| 	client := mqtt.NewClient(opts) | 	client := mqtt.NewClient(opts) | ||||||
| 	if token := client.Connect(); token.Wait() && token.Error() != nil { | 	if token := client.Connect(); token.Wait() && token.Error() != nil { | ||||||
| 		log.Fatal(token.Error()) | 		log.Fatal(token.Error()) | ||||||
| 	} | 	} | ||||||
|  | 	return client | ||||||
|  | } | ||||||
|  |  | ||||||
|  | //Netclient sets up Message Queue and subsribes/publishes updates to/from server | ||||||
|  | func Netclient(network string) { | ||||||
|  | 	ctx, cancel := context.WithCancel(context.Background()) | ||||||
|  | 	var cfg config.ClientConfig | ||||||
|  | 	cfg.Network = network | ||||||
|  | 	cfg.ReadConfig() | ||||||
|  | 	//fix NodeID to remove ### so NodeID can be used as message topic | ||||||
|  | 	//remove with GRA-73 | ||||||
|  | 	cfg.Node.ID = strings.ReplaceAll(cfg.Node.ID, "###", "-") | ||||||
|  | 	ncutils.Log("daemon started for network:" + network) | ||||||
|  | 	client := SetupMQTT(cfg) | ||||||
| 	if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil { | 	if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil { | ||||||
| 		log.Fatal(token.Error()) | 		log.Fatal(token.Error()) | ||||||
| 	} | 	} | ||||||
| 	client.AddRoute("update/"+network+"/"+cfg.Node.MacAddress, NodeUpdate) | 	client.AddRoute("update/"+cfg.Node.ID, NodeUpdate) | ||||||
| 	client.AddRoute("update/"+network+"/peers", UpdatePeers) | 	client.AddRoute("update/peers/"+cfg.Node.ID, UpdatePeers) | ||||||
| 	client.AddRoute("update/"+network+"/keys", UpdateKeys) | 	client.AddRoute("update/keys/"+cfg.Node.ID, UpdateKeys) | ||||||
| 	client.AddRoute("update/"+network+"/keys/"+cfg.Node.MacAddress, UpdateKeys) |  | ||||||
| 	defer client.Disconnect(250) | 	defer client.Disconnect(250) | ||||||
| 	go Checkin(client, network) | 	go Checkin(ctx, cfg, network) | ||||||
| 	//go Metrics(client, network) | 	go Metrics(ctx, cfg, network) | ||||||
| 	//go Connectivity(client, network) | 	quit := make(chan os.Signal, 1) | ||||||
| 	for { | 	signal.Notify(quit, syscall.SIGTERM, os.Interrupt) | ||||||
| 	} | 	<-quit | ||||||
|  | 	cancel() | ||||||
| } | } | ||||||
|  |  | ||||||
|  | //All -- mqtt message hander for all ('#') topics | ||||||
| var All mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { | var All mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { | ||||||
| 	ncutils.Log("Topic: " + string(msg.Topic())) | 	ncutils.Log("Topic: " + string(msg.Topic())) | ||||||
| 	ncutils.Log("Message: " + string(msg.Payload())) | 	ncutils.Log("Message: " + string(msg.Payload())) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | //NodeUpdate -- mqtt message handler for /update/<NodeID> topic | ||||||
| var NodeUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { | var NodeUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { | ||||||
| 	ncutils.Log("received message to update node " + string(msg.Payload())) | 	ncutils.Log("received message to update node " + string(msg.Payload())) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | //NodeUpdate -- mqtt message handler for /update/peers/<NodeID> topic | ||||||
| var UpdatePeers mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { | var UpdatePeers mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { | ||||||
| 	ncutils.Log("received message to update peers " + string(msg.Payload())) | 	ncutils.Log("received message to update peers " + string(msg.Payload())) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | //NodeUpdate -- mqtt message handler for /update/keys/<NodeID> topic | ||||||
| var UpdateKeys mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { | var UpdateKeys mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { | ||||||
| 	ncutils.Log("received message to update keys " + string(msg.Payload())) | 	ncutils.Log("received message to update keys " + string(msg.Payload())) | ||||||
| } | } | ||||||
|  |  | ||||||
| func Checkin(client mqtt.Client, network string) { | //Checkin  -- go routine that checks for public or local ip changes, publishes changes | ||||||
| 	var cfg config.ClientConfig | //   if there are no updates, simply "pings" the server as a checkin | ||||||
| 	cfg.Network = network | func Checkin(ctx context.Context, cfg config.ClientConfig, network string) { | ||||||
| 	cfg.ReadConfig() | 	select { | ||||||
| 	for { | 	case <-ctx.Done(): | ||||||
| 		time.Sleep(time.Duration(cfg.Node.NetworkSettings.DefaultCheckInInterval) * time.Second) | 		ncutils.Log("Checkin cancelled") | ||||||
|  | 		return | ||||||
|  | 		//delay should be configuraable -> use cfg.Node.NetworkSettings.DefaultCheckInInterval ?? | ||||||
|  | 	case <-time.After(time.Second * 10): | ||||||
| 		ncutils.Log("Checkin running") | 		ncutils.Log("Checkin running") | ||||||
| 		if cfg.Node.Roaming == "yes" && cfg.Node.IsStatic != "yes" { | 		if cfg.Node.Roaming == "yes" && cfg.Node.IsStatic != "yes" { | ||||||
| 			extIP, err := ncutils.GetPublicIP() | 			extIP, err := ncutils.GetPublicIP() | ||||||
| @@ -87,7 +107,7 @@ func Checkin(client mqtt.Client, network string) { | |||||||
| 			} | 			} | ||||||
| 			if cfg.Node.Endpoint != extIP && extIP != "" { | 			if cfg.Node.Endpoint != extIP && extIP != "" { | ||||||
| 				ncutils.PrintLog("endpoint has changed from "+cfg.Node.Endpoint+" to "+extIP, 1) | 				ncutils.PrintLog("endpoint has changed from "+cfg.Node.Endpoint+" to "+extIP, 1) | ||||||
| 				UpdateEndpoint(client, network, extIP) | 				UpdateEndpoint(cfg, network, extIP) | ||||||
| 			} | 			} | ||||||
| 			intIP, err := getPrivateAddr() | 			intIP, err := getPrivateAddr() | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| @@ -95,7 +115,7 @@ func Checkin(client mqtt.Client, network string) { | |||||||
| 			} | 			} | ||||||
| 			if cfg.Node.LocalAddress != intIP && intIP != "" { | 			if cfg.Node.LocalAddress != intIP && intIP != "" { | ||||||
| 				ncutils.PrintLog("local Address has changed from "+cfg.Node.LocalAddress+" to "+intIP, 1) | 				ncutils.PrintLog("local Address has changed from "+cfg.Node.LocalAddress+" to "+intIP, 1) | ||||||
| 				UpdateLocalAddress(client, network, intIP) | 				UpdateLocalAddress(cfg, network, intIP) | ||||||
| 			} | 			} | ||||||
| 		} else { | 		} else { | ||||||
| 			localIP, err := ncutils.GetLocalIP(cfg.Node.LocalRange) | 			localIP, err := ncutils.GetLocalIP(cfg.Node.LocalRange) | ||||||
| @@ -104,31 +124,52 @@ func Checkin(client mqtt.Client, network string) { | |||||||
| 			} | 			} | ||||||
| 			if cfg.Node.Endpoint != localIP && localIP != "" { | 			if cfg.Node.Endpoint != localIP && localIP != "" { | ||||||
| 				ncutils.PrintLog("endpoint has changed from "+cfg.Node.Endpoint+" to "+localIP, 1) | 				ncutils.PrintLog("endpoint has changed from "+cfg.Node.Endpoint+" to "+localIP, 1) | ||||||
| 				UpdateEndpoint(client, network, localIP) | 				UpdateEndpoint(cfg, network, localIP) | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 		Ping(client, network) | 		Hello(cfg, network) | ||||||
|  | 		ncutils.Log("Checkin complete") | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func Ping(client mqtt.Client, network string) { | //UpdateEndpoint -- publishes an endpoint update to broker | ||||||
| 	var cfg config.ClientConfig | func UpdateEndpoint(cfg config.ClientConfig, network, ip string) { | ||||||
| 	cfg.Network = network | 	ncutils.Log("Updating endpoint") | ||||||
| 	cfg.ReadConfig() | 	client := SetupMQTT(cfg) | ||||||
| 	if token := client.Publish("ping/"+network+"/"+cfg.Node.ID, 0, false, []byte("ping")); token.Wait() && token.Error() != nil { | 	if token := client.Publish("update/ip/"+cfg.Node.ID, 0, false, ip); token.Wait() && token.Error() != nil { | ||||||
|  | 		ncutils.Log("error publishing endpoint update " + token.Error().Error()) | ||||||
|  | 	} | ||||||
|  | 	client.Disconnect(250) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | //UpdateLocalAddress -- publishes a local address update to broker | ||||||
|  | func UpdateLocalAddress(cfg config.ClientConfig, network, ip string) { | ||||||
|  | 	ncutils.Log("Updating local address") | ||||||
|  | 	client := SetupMQTT(cfg) | ||||||
|  | 	if token := client.Publish("update/localaddress/"+cfg.Node.ID, 0, false, ip); token.Wait() && token.Error() != nil { | ||||||
|  | 		ncutils.Log("error publishing local address update " + token.Error().Error()) | ||||||
|  | 	} | ||||||
|  | 	client.Disconnect(250) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | //Hello -- ping the broker to let server know node is alive and doing fine | ||||||
|  | func Hello(cfg config.ClientConfig, network string) { | ||||||
|  | 	client := SetupMQTT(cfg) | ||||||
|  | 	if token := client.Publish("ping/"+network+"/"+cfg.Node.ID, 0, false, "hello world!"); token.Wait() && token.Error() != nil { | ||||||
| 		ncutils.Log("error publishing ping " + token.Error().Error()) | 		ncutils.Log("error publishing ping " + token.Error().Error()) | ||||||
| 	} | 	} | ||||||
|  | 	client.Disconnect(250) | ||||||
| } | } | ||||||
|  |  | ||||||
| func Metrics(client mqtt.Client, network string) { | //Metics --  go routine that collects wireguard metrics and publishes to broker | ||||||
| 	if token := client.Connect(); token.Wait() && token.Error() != nil { | func Metrics(ctx context.Context, cfg config.ClientConfig, network string) { | ||||||
| 		log.Fatal(token.Error()) | 	select { | ||||||
| 	} | 	case <-ctx.Done(): | ||||||
| 	var cfg config.ClientConfig | 		ncutils.Log("Metrics collection cancelled") | ||||||
| 	cfg.Network = network | 		return | ||||||
| 	cfg.ReadConfig() | 		//delay should be configuraable -> use cfg.Node.NetworkSettings.DefaultCheckInInterval ?? | ||||||
| 	for { | 	case <-time.After(time.Second * 60): | ||||||
| 		time.Sleep(time.Second * 60) | 		ncutils.Log("Metrics collection running") | ||||||
| 		ncutils.Log("Metrics running") | 		ncutils.Log("Metrics running") | ||||||
| 		wg, err := wgctrl.New() | 		wg, err := wgctrl.New() | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| @@ -145,70 +186,12 @@ func Metrics(client mqtt.Client, network string) { | |||||||
| 			ncutils.Log("error marshaling peers " + err.Error()) | 			ncutils.Log("error marshaling peers " + err.Error()) | ||||||
| 			break | 			break | ||||||
| 		} | 		} | ||||||
| 		if token := client.Publish("metrics/"+network+"/"+cfg.Node.ID, 1, false, bytes); token.Wait() && token.Error() != nil { | 		client := SetupMQTT(cfg) | ||||||
|  | 		if token := client.Publish("metrics/"+cfg.Node.ID, 1, false, bytes); token.Wait() && token.Error() != nil { | ||||||
| 			ncutils.Log("error publishing metrics " + token.Error().Error()) | 			ncutils.Log("error publishing metrics " + token.Error().Error()) | ||||||
| 			break |  | ||||||
| 		} | 		} | ||||||
| 		wg.Close() | 		wg.Close() | ||||||
|  | 		client.Disconnect(250) | ||||||
|  | 		ncutils.Log("metrics collection complete") | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| type PingStat struct { |  | ||||||
| 	Name      string |  | ||||||
| 	Reachable bool |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func Connectivity(client mqtt.Client, network string) { |  | ||||||
| 	if token := client.Connect(); token.Wait() && token.Error() != nil { |  | ||||||
| 		log.Fatal(token.Error()) |  | ||||||
| 	} |  | ||||||
| 	var cfg config.ClientConfig |  | ||||||
| 	cfg.Network = network |  | ||||||
| 	cfg.ReadConfig() |  | ||||||
| 	for { |  | ||||||
| 		time.Sleep(time.Duration(cfg.NetworkSettings.DefaultCheckInInterval) * time.Second) |  | ||||||
| 		ncutils.Log("Connectivity running") |  | ||||||
| 		var pingStats []PingStat |  | ||||||
| 		peers, err := ncutils.GetPeers(cfg.Node.Interface) |  | ||||||
| 		if err != nil { |  | ||||||
| 			ncutils.Log("error retriving peers " + err.Error()) |  | ||||||
| 			break |  | ||||||
| 		} |  | ||||||
| 		for _, peer := range peers { |  | ||||||
| 			var pingStat PingStat |  | ||||||
| 			pingStat.Name = peer.PublicKey.String() |  | ||||||
| 			pingStat.Reachable = true |  | ||||||
| 			ip := peer.Endpoint.IP.String() |  | ||||||
| 			fmt.Println("----------", peer.Endpoint.IP, ip) |  | ||||||
| 			pinger, err := ping.NewPinger(ip) |  | ||||||
| 			if err != nil { |  | ||||||
| 				ncutils.Log("error creating pinger " + err.Error()) |  | ||||||
| 				break |  | ||||||
| 			} |  | ||||||
| 			pinger.Timeout = 2 * time.Second |  | ||||||
| 			pinger.Run() |  | ||||||
| 			stats := pinger.Statistics() |  | ||||||
| 			if stats.PacketLoss == 100 { |  | ||||||
| 				pingStat.Reachable = false |  | ||||||
| 			} |  | ||||||
| 			pingStats = append(pingStats, pingStat) |  | ||||||
| 		} |  | ||||||
| 		bytes, err := json.Marshal(pingStats) |  | ||||||
| 		if err != nil { |  | ||||||
| 			ncutils.Log("error marshaling stats" + err.Error()) |  | ||||||
| 			break |  | ||||||
| 		} |  | ||||||
| 		if token := client.Publish("connectivity/"+network+"/"+cfg.Node.ID, 1, false, bytes); token.Wait() && token.Error() != nil { |  | ||||||
| 			ncutils.Log("error publishing ping stats " + token.Error().Error()) |  | ||||||
| 			break |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func UpdateEndpoint(client mqtt.Client, network, ip string) { |  | ||||||
| 	ncutils.Log("Updating endpoint") |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func UpdateLocalAddress(client mqtt.Client, network, ip string) { |  | ||||||
| 	ncutils.Log("Updating local address") |  | ||||||
| } |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Matthew R Kasun
					Matthew R Kasun