refactor to reduce number of types of message in message queue simplifying the subscribers and publishers

This commit is contained in:
Matthew R Kasun
2022-01-21 05:19:17 -05:00
parent d4399d1321
commit 2310977391
9 changed files with 215 additions and 256 deletions

View File

@@ -72,6 +72,7 @@ type ServerConfig struct {
DisplayKeys string `yaml:"displaykeys"` DisplayKeys string `yaml:"displaykeys"`
AzureTenant string `yaml:"azuretenant"` AzureTenant string `yaml:"azuretenant"`
RCE string `yaml:"rce"` RCE string `yaml:"rce"`
Debug bool `yaml:"debug"`
} }
// SQLConfig - Generic SQL Config // SQLConfig - Generic SQL Config

View File

@@ -0,0 +1,2 @@
10.0.0.1 testnode.skynet
10.0.0.2 myhost.skynet

122
logic/peers.go Normal file
View File

@@ -0,0 +1,122 @@
package logic
import (
"log"
"net"
"strconv"
"strings"
"time"
"github.com/gravitl/netmaker/models"
"github.com/gravitl/netmaker/netclient/ncutils"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
)
func GetPeerUpdate(node *models.Node) (models.PeerUpdate, error) {
var peerUpdate models.PeerUpdate
var peers []wgtypes.PeerConfig
networkNodes, err := GetNetworkNodes(node.Network)
if err != nil {
return models.PeerUpdate{}, err
}
for _, peer := range networkNodes {
if peer.ID == node.ID {
//skip yourself
continue
}
pubkey, err := wgtypes.ParseKey(peer.PublicKey)
if err != nil {
return models.PeerUpdate{}, err
}
if node.Endpoint == peer.Endpoint {
//peer is on same network
if node.LocalAddress != peer.LocalAddress && peer.LocalAddress != "" {
peer.Endpoint = peer.LocalAddress
} else {
continue
}
}
endpoint := peer.Endpoint + ":" + strconv.FormatInt(int64(peer.ListenPort), 10)
address, err := net.ResolveUDPAddr("udp", endpoint)
if err != nil {
return models.PeerUpdate{}, err
}
allowedips := GetAllowedIPs(node, &peer)
var keepalive time.Duration
if node.PersistentKeepalive != 0 {
keepalive, _ = time.ParseDuration(strconv.FormatInt(int64(node.PersistentKeepalive), 10) + "s")
}
var peerData = wgtypes.PeerConfig{
PublicKey: pubkey,
Endpoint: address,
ReplaceAllowedIPs: true,
AllowedIPs: allowedips,
PersistentKeepaliveInterval: &keepalive,
}
peers = append(peers, peerData)
}
peerUpdate.Network = node.Network
peerUpdate.Peers = peers
return peerUpdate, nil
}
func GetAllowedIPs(node, peer *models.Node) []net.IPNet {
var allowedips []net.IPNet
var gateways []string
var peeraddr = net.IPNet{
IP: net.ParseIP(peer.Address),
Mask: net.CIDRMask(32, 32),
}
dualstack := false
allowedips = append(allowedips, peeraddr)
// handle manually set peers
for _, allowedIp := range node.AllowedIPs {
if _, ipnet, err := net.ParseCIDR(allowedIp); err == nil {
nodeEndpointArr := strings.Split(node.Endpoint, ":")
if !ipnet.Contains(net.IP(nodeEndpointArr[0])) && ipnet.IP.String() != node.Address { // don't need to add an allowed ip that already exists..
allowedips = append(allowedips, *ipnet)
}
} else if appendip := net.ParseIP(allowedIp); appendip != nil && allowedIp != node.Address {
ipnet := net.IPNet{
IP: net.ParseIP(allowedIp),
Mask: net.CIDRMask(32, 32),
}
allowedips = append(allowedips, ipnet)
}
}
// handle egress gateway peers
if node.IsEgressGateway == "yes" {
//hasGateway = true
ranges := node.EgressGatewayRanges
for _, iprange := range ranges { // go through each cidr for egress gateway
_, ipnet, err := net.ParseCIDR(iprange) // confirming it's valid cidr
if err != nil {
ncutils.PrintLog("could not parse gateway IP range. Not adding "+iprange, 1)
continue // if can't parse CIDR
}
nodeEndpointArr := strings.Split(node.Endpoint, ":") // getting the public ip of node
if ipnet.Contains(net.ParseIP(nodeEndpointArr[0])) { // ensuring egress gateway range does not contain public ip of node
ncutils.PrintLog("egress IP range of "+iprange+" overlaps with "+node.Endpoint+", omitting", 2)
continue // skip adding egress range if overlaps with node's ip
}
if ipnet.Contains(net.ParseIP(node.LocalAddress)) { // ensuring egress gateway range does not contain public ip of node
ncutils.PrintLog("egress IP range of "+iprange+" overlaps with "+node.LocalAddress+", omitting", 2)
continue // skip adding egress range if overlaps with node's local ip
}
gateways = append(gateways, iprange)
if err != nil {
log.Println("ERROR ENCOUNTERED SETTING GATEWAY")
} else {
allowedips = append(allowedips, *ipnet)
}
}
}
if node.Address6 != "" && dualstack {
var addr6 = net.IPNet{
IP: net.ParseIP(node.Address6),
Mask: net.CIDRMask(128, 128),
}
allowedips = append(allowedips, addr6)
}
return allowedips
}

46
main.go
View File

@@ -10,6 +10,7 @@ import (
"strconv" "strconv"
"sync" "sync"
"syscall" "syscall"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang" mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/gravitl/netmaker/auth" "github.com/gravitl/netmaker/auth"
@@ -118,16 +119,16 @@ func startControllers() {
logger.Log(0, "No Server Mode selected, so nothing is being served! Set Agent mode (AGENT_BACKEND) or Rest mode (REST_BACKEND) or MessageQueue (MESSAGEQUEUE_BACKEND) to 'true'.") logger.Log(0, "No Server Mode selected, so nothing is being served! Set Agent mode (AGENT_BACKEND) or Rest mode (REST_BACKEND) or MessageQueue (MESSAGEQUEUE_BACKEND) to 'true'.")
} }
//if servercfg.IsClientMode() == "on" { if servercfg.IsClientMode() == "on" {
// var checkintime = time.Duration(servercfg.GetServerCheckinInterval()) * time.Second var checkintime = time.Duration(servercfg.GetServerCheckinInterval()) * time.Second
// for { // best effort currently for { // best effort currently
// var serverGroup sync.WaitGroup var serverGroup sync.WaitGroup
// serverGroup.Add(1) serverGroup.Add(1)
// go runClient(&serverGroup) go runClient(&serverGroup)
// serverGroup.Wait() serverGroup.Wait()
// time.Sleep(checkintime) time.Sleep(checkintime)
// } }
//} }
waitnetwork.Wait() waitnetwork.Wait()
} }
@@ -191,34 +192,25 @@ func runMessageQueue(wg *sync.WaitGroup) {
opts := mqtt.NewClientOptions() opts := mqtt.NewClientOptions()
opts.AddBroker(servercfg.GetMessageQueueEndpoint()) opts.AddBroker(servercfg.GetMessageQueueEndpoint())
logger.Log(0, "setting broker "+servercfg.GetMessageQueueEndpoint()) logger.Log(0, "setting broker "+servercfg.GetMessageQueueEndpoint())
opts.SetDefaultPublishHandler(mq.DefaultHandler)
client := mqtt.NewClient(opts) client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil { if token := client.Connect(); token.Wait() && token.Error() != nil {
logger.Log(0, "unable to connect to message queue broker, closing down") logger.Log(0, "unable to connect to message queue broker, closing down")
return return
} }
//Set up Subscriptions //Set up Subscriptions
if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil { if servercfg.GetDebug() {
//should make constant for disconnect wait period if token := client.Subscribe("#", 2, mq.DefaultHandler); token.Wait() && token.Error() != nil {
client.Disconnect(250) client.Disconnect(240)
logger.Log(0, "could not subscribe to message queue ...") logger.Log(0, "default subscription failed")
return }
} }
if token := client.Subscribe("ping/#", 2, mq.Ping); token.Wait() && token.Error() != nil { if token := client.Subscribe("ping/#", 2, mq.Ping); token.Wait() && token.Error() != nil {
client.Disconnect(240) client.Disconnect(240)
logger.Log(0, "ping sub failed") logger.Log(0, "ping subscription failed")
} }
if token := client.Subscribe("update/localaddress/#", 0, mq.LocalAddressUpdate); token.Wait() && token.Error() != nil { if token := client.Subscribe("update/#", 0, mq.UpdateNode); token.Wait() && token.Error() != nil {
client.Disconnect(240) client.Disconnect(240)
logger.Log(0, "metrics sub failed") logger.Log(0, "node update subscription failed")
}
if token := client.Subscribe("update/ip/#", 0, mq.IPUpdate); token.Wait() && token.Error() != nil {
client.Disconnect(240)
logger.Log(0, "metrics sub failed")
}
if token := client.Subscribe("update/publickey/#", 0, mq.PublicKeyUpdate); token.Wait() && token.Error() != nil {
client.Disconnect(240)
logger.Log(0, "metrics sub failed")
} }
quit := make(chan os.Signal, 1) quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGTERM, os.Interrupt) signal.Notify(quit, syscall.SIGTERM, os.Interrupt)

223
mq/mq.go
View File

@@ -3,26 +3,22 @@ package mq
import ( import (
"encoding/json" "encoding/json"
"errors" "errors"
"log"
"net"
"strconv"
"strings" "strings"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang" mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/gravitl/netmaker/database" "github.com/gravitl/netmaker/database"
"github.com/gravitl/netmaker/logger" "github.com/gravitl/netmaker/logger"
"github.com/gravitl/netmaker/logic" "github.com/gravitl/netmaker/logic"
"github.com/gravitl/netmaker/models" "github.com/gravitl/netmaker/models"
"github.com/gravitl/netmaker/netclient/ncutils"
"github.com/gravitl/netmaker/servercfg" "github.com/gravitl/netmaker/servercfg"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
) )
// default message handler - only called in GetDebug == true
var DefaultHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { var DefaultHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
logger.Log(0, "MQTT Message: Topic: "+string(msg.Topic())+" Message: "+string(msg.Payload())) logger.Log(0, "MQTT Message: Topic: "+string(msg.Topic())+" Message: "+string(msg.Payload()))
} }
// Ping message Handler -- handles ping topic from client nodes
var Ping mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { var Ping mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
logger.Log(0, "Ping Handler: "+msg.Topic()) logger.Log(0, "Ping Handler: "+msg.Topic())
go func() { go func() {
@@ -53,211 +49,64 @@ var Ping mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
}() }()
} }
var PublicKeyUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { // UpdateNode message Handler -- handles updates from client nodes
logger.Log(0, "PublicKey Handler") var UpdateNode mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
go func() { go func() {
logger.Log(0, "public key update "+msg.Topic())
key := string(msg.Payload())
id, err := GetID(msg.Topic()) id, err := GetID(msg.Topic())
if err != nil { if err != nil {
logger.Log(0, "error getting node.ID sent on "+msg.Topic()+" "+err.Error()) logger.Log(1, "error getting node.ID sent on "+msg.Topic()+" "+err.Error())
return
} }
node, err := logic.GetNodeByID(id) logger.Log(1, "Update Node Handler"+id)
var newNode models.Node
if err := json.Unmarshal(msg.Payload(), &newNode); err != nil {
logger.Log(1, "error unmarshaling payload "+err.Error())
return
}
currentNode, err := logic.GetNodeByID(newNode.ID)
if err != nil { if err != nil {
logger.Log(0, "error retrieving node "+msg.Topic()+" "+err.Error()) logger.Log(1, "error getting node "+newNode.ID+" "+err.Error())
return
} }
node.PublicKey = key if err := logic.UpdateNode(&currentNode, &newNode); err != nil {
node.SetLastCheckIn() logger.Log(1, "error saving node"+err.Error())
if err := logic.UpdateNode(&node, &node); err != nil {
logger.Log(0, "error updating node "+err.Error())
} }
if err := UpdatePeers(client, node); err != nil { if logic.ShouldPeersUpdate(&currentNode, &newNode) {
logger.Log(0, "error updating peers "+err.Error()) if err := PublishPeerUpdate(client, &newNode); err != nil {
logger.Log(1, "error publishing peer update "+err.Error())
return
}
} }
}() }()
} }
var IPUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { // PublishPeerUpdate --- deterines and publishes a peer update to all the peers of a node
go func() { func PublishPeerUpdate(client mqtt.Client, newNode *models.Node) error {
ip := string(msg.Payload()) networkNodes, err := logic.GetNetworkNodes(newNode.Network)
logger.Log(0, "IPUpdate Handler")
id, err := GetID(msg.Topic())
logger.Log(0, "ipUpdate recieved from "+id)
if err != nil {
logger.Log(0, "error getting node.ID sent on update/ip topic ")
return
}
node, err := logic.GetNodeByID(id)
if err != nil {
logger.Log(0, "invalid ID recieved on update/ip topic: "+err.Error())
return
}
node.Endpoint = ip
node.SetLastCheckIn()
if err := logic.UpdateNode(&node, &node); err != nil {
logger.Log(0, "error updating node "+err.Error())
}
if err != UpdatePeers(client, node) {
logger.Log(0, "error updating peers "+err.Error())
}
}()
}
func UpdatePeers(client mqtt.Client, newnode models.Node) error {
networkNodes, err := logic.GetNetworkNodes(newnode.Network)
if err != nil { if err != nil {
logger.Log(1, "err getting Network Nodes"+err.Error())
return err return err
} }
dualstack := false
var keepalive time.Duration
//keepalive = time.Duration{}
if newnode.PersistentKeepalive != 0 {
keepalive, _ = time.ParseDuration(strconv.FormatInt(int64(newnode.PersistentKeepalive), 10) + "s")
}
for _, node := range networkNodes { for _, node := range networkNodes {
var peers []wgtypes.PeerConfig peerUpdate, err := logic.GetPeerUpdate(&node)
var peerUpdate models.PeerUpdate if err != nil {
var gateways []string logger.Log(1, "error getting peer update for node "+node.ID+" "+err.Error())
continue
for _, peer := range networkNodes {
if peer.ID == node.ID {
//skip
continue
}
var allowedips []net.IPNet
var peeraddr = net.IPNet{
IP: net.ParseIP(peer.Address),
Mask: net.CIDRMask(32, 32),
}
//hasGateway := false
pubkey, err := wgtypes.ParseKey(peer.PublicKey)
if err != nil {
return err
}
if node.Endpoint == peer.Endpoint {
if node.LocalAddress != peer.LocalAddress && peer.LocalAddress != "" {
peer.Endpoint = peer.LocalAddress
} else {
continue
}
}
endpoint := peer.Endpoint + ":" + strconv.Itoa(int(peer.ListenPort))
//fmt.Println("endpoint: ", endpoint, peer.Endpoint, peer.ListenPort)
address, err := net.ResolveUDPAddr("udp", endpoint)
if err != nil {
return err
}
//calculate Allowed IPs.
allowedips = append(allowedips, peeraddr)
// handle manually set peers
for _, allowedIp := range node.AllowedIPs {
if _, ipnet, err := net.ParseCIDR(allowedIp); err == nil {
nodeEndpointArr := strings.Split(node.Endpoint, ":")
if !ipnet.Contains(net.IP(nodeEndpointArr[0])) && ipnet.IP.String() != node.Address { // don't need to add an allowed ip that already exists..
allowedips = append(allowedips, *ipnet)
}
} else if appendip := net.ParseIP(allowedIp); appendip != nil && allowedIp != node.Address {
ipnet := net.IPNet{
IP: net.ParseIP(allowedIp),
Mask: net.CIDRMask(32, 32),
}
allowedips = append(allowedips, ipnet)
}
}
// handle egress gateway peers
if node.IsEgressGateway == "yes" {
//hasGateway = true
ranges := node.EgressGatewayRanges
for _, iprange := range ranges { // go through each cidr for egress gateway
_, ipnet, err := net.ParseCIDR(iprange) // confirming it's valid cidr
if err != nil {
ncutils.PrintLog("could not parse gateway IP range. Not adding "+iprange, 1)
continue // if can't parse CIDR
}
nodeEndpointArr := strings.Split(node.Endpoint, ":") // getting the public ip of node
if ipnet.Contains(net.ParseIP(nodeEndpointArr[0])) { // ensuring egress gateway range does not contain public ip of node
ncutils.PrintLog("egress IP range of "+iprange+" overlaps with "+node.Endpoint+", omitting", 2)
continue // skip adding egress range if overlaps with node's ip
}
if ipnet.Contains(net.ParseIP(node.LocalAddress)) { // ensuring egress gateway range does not contain public ip of node
ncutils.PrintLog("egress IP range of "+iprange+" overlaps with "+node.LocalAddress+", omitting", 2)
continue // skip adding egress range if overlaps with node's local ip
}
gateways = append(gateways, iprange)
if err != nil {
log.Println("ERROR ENCOUNTERED SETTING GATEWAY")
} else {
allowedips = append(allowedips, *ipnet)
}
}
}
var peerData wgtypes.PeerConfig
if node.Address6 != "" && dualstack {
var addr6 = net.IPNet{
IP: net.ParseIP(node.Address6),
Mask: net.CIDRMask(128, 128),
}
allowedips = append(allowedips, addr6)
}
if &keepalive == nil {
peerData = wgtypes.PeerConfig{
PublicKey: pubkey,
Endpoint: address,
ReplaceAllowedIPs: true,
AllowedIPs: allowedips,
}
} else {
peerData = wgtypes.PeerConfig{
PublicKey: pubkey,
PersistentKeepaliveInterval: &keepalive,
Endpoint: address,
ReplaceAllowedIPs: true,
AllowedIPs: allowedips,
}
}
peers = append(peers, peerData)
} }
peerUpdate.Network = node.Network
peerUpdate.Peers = peers
data, err := json.Marshal(&peerUpdate) data, err := json.Marshal(&peerUpdate)
if err != nil { if err != nil {
logger.Log(0, "error marshaling peer update "+err.Error()) logger.Log(2, "error marshaling peer update "+err.Error())
return err return err
} }
if token := client.Publish("/update/peers/"+node.ID, 0, false, data); token.Wait() && token.Error() != nil { if token := client.Publish("/update/peers"+node.ID, 0, false, data); token.Wait() && token.Error() != nil {
logger.Log(0, "error sending peer updatte to no") logger.Log(2, "error publishing peer update to peer "+node.ID+" "+token.Error().Error())
return err return err
} }
} }
return nil return nil
} }
var LocalAddressUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { // GetID -- decodes a message queue topic and returns the embedded node.ID
logger.Log(0, "LocalAddressUpdate Handler")
go func() {
logger.Log(0, "LocalAddressUpdate handler")
id, err := GetID(msg.Topic())
if err != nil {
logger.Log(0, "error getting node.ID "+msg.Topic())
return
}
node, err := logic.GetNodeByID(id)
if err != nil {
logger.Log(0, "error get node "+msg.Topic())
return
}
node.LocalAddress = string(msg.Payload())
node.SetLastCheckIn()
if err := logic.UpdateNode(&node, &node); err != nil {
logger.Log(0, "error updating node "+err.Error())
}
if err := UpdatePeers(client, node); err != nil {
logger.Log(0, "error updating peers "+err.Error())
}
}()
}
func GetID(topic string) (string, error) { func GetID(topic string) (string, error) {
parts := strings.Split(topic, "/") parts := strings.Split(topic, "/")
count := len(parts) count := len(parts)
@@ -268,6 +117,7 @@ func GetID(topic string) (string, error) {
return parts[count-1], nil return parts[count-1], nil
} }
// NewPeer -- publishes a peer update to all the peers of a newNode
func NewPeer(node models.Node) error { func NewPeer(node models.Node) error {
opts := mqtt.NewClientOptions() opts := mqtt.NewClientOptions()
broker := servercfg.GetMessageQueueEndpoint() broker := servercfg.GetMessageQueueEndpoint()
@@ -277,8 +127,7 @@ func NewPeer(node models.Node) error {
if token := client.Connect(); token.Wait() && token.Error() != nil { if token := client.Connect(); token.Wait() && token.Error() != nil {
return token.Error() return token.Error()
} }
if err := PublishPeerUpdate(client, &node); err != nil {
if err := UpdatePeers(client, node); err != nil {
return err return err
} }
return nil return nil

View File

@@ -18,7 +18,7 @@ func Join(cfg config.ClientConfig, privateKey string) error {
var err error var err error
err = functions.JoinNetwork(cfg, privateKey) err = functions.JoinNetwork(cfg, privateKey)
if err != nil && !cfg.DebugJoin { if err != nil && !cfg.DebugOn {
if !strings.Contains(err.Error(), "ALREADY_INSTALLED") { if !strings.Contains(err.Error(), "ALREADY_INSTALLED") {
ncutils.PrintLog("error installing: "+err.Error(), 1) ncutils.PrintLog("error installing: "+err.Error(), 1)
err = functions.LeaveNetwork(cfg.Network) err = functions.LeaveNetwork(cfg.Network)

View File

@@ -29,7 +29,7 @@ type ClientConfig struct {
Network string `yaml:"network"` Network string `yaml:"network"`
Daemon string `yaml:"daemon"` Daemon string `yaml:"daemon"`
OperatingSystem string `yaml:"operatingsystem"` OperatingSystem string `yaml:"operatingsystem"`
DebugJoin bool `yaml:"debugjoin"` DebugOn bool `yaml:"debugon"`
} }
// ServerConfig - struct for dealing with the server information for a netclient // ServerConfig - struct for dealing with the server information for a netclient

View File

@@ -38,7 +38,7 @@ func Daemon() error {
} }
// SetupMQTT creates a connection to broker and return client // SetupMQTT creates a connection to broker and return client
func SetupMQTT(cfg config.ClientConfig) mqtt.Client { func SetupMQTT(cfg *config.ClientConfig) mqtt.Client {
opts := mqtt.NewClientOptions() opts := mqtt.NewClientOptions()
ncutils.Log("setting broker to " + cfg.Server.CoreDNSAddr + ":1883") ncutils.Log("setting broker to " + cfg.Server.CoreDNSAddr + ":1883")
opts.AddBroker(cfg.Server.CoreDNSAddr + ":1883") opts.AddBroker(cfg.Server.CoreDNSAddr + ":1883")
@@ -53,13 +53,15 @@ func SetupMQTT(cfg config.ClientConfig) mqtt.Client {
// MessageQueue sets up Message Queue and subsribes/publishes updates to/from server // MessageQueue sets up Message Queue and subsribes/publishes updates to/from server
func MessageQueue(ctx context.Context, network string) { func MessageQueue(ctx context.Context, network string) {
ncutils.Log("netclient go routine started for " + network) ncutils.Log("netclient go routine started for " + network)
var cfg config.ClientConfig var cfg *config.ClientConfig
cfg.Network = network cfg.Network = network
cfg.ReadConfig() cfg.ReadConfig()
ncutils.Log("daemon started for network:" + network) ncutils.Log("daemon started for network:" + network)
client := SetupMQTT(cfg) client := SetupMQTT(cfg)
if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil { if cfg.DebugOn {
log.Fatal(token.Error()) if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil {
log.Fatal(token.Error())
}
} }
if token := client.Subscribe("update/"+cfg.Node.ID, 0, NodeUpdate); token.Wait() && token.Error() != nil { if token := client.Subscribe("update/"+cfg.Node.ID, 0, NodeUpdate); token.Wait() && token.Error() != nil {
log.Fatal(token.Error()) log.Fatal(token.Error())
@@ -67,12 +69,6 @@ func MessageQueue(ctx context.Context, network string) {
if token := client.Subscribe("/update/peers/"+cfg.Node.ID, 0, UpdatePeers); token.Wait() && token.Error() != nil { if token := client.Subscribe("/update/peers/"+cfg.Node.ID, 0, UpdatePeers); token.Wait() && token.Error() != nil {
log.Fatal(token.Error()) log.Fatal(token.Error())
} }
//addroute doesn't seem to work consistently
//client.AddRoute("update/"+cfg.Node.ID, NodeUpdate)
//client.AddRoute("update/peers/"+cfg.Node.ID, UpdatePeers)
//handle key updates in node update
//client.AddRoute("update/keys/"+cfg.Node.ID, UpdateKeys)
defer client.Disconnect(250) defer client.Disconnect(250)
go Checkin(ctx, cfg, network) go Checkin(ctx, cfg, network)
<-ctx.Done() <-ctx.Done()
@@ -135,8 +131,9 @@ var NodeUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message)
ncutils.Log("error updating wireguard config " + err.Error()) ncutils.Log("error updating wireguard config " + err.Error())
return return
} }
// path hardcoded for now... should be updated file := ncutils.GetNetclientPathSpecific() + cfg.Node.Interface + ".conf"
err = wireguard.ApplyWGQuickConf("/etc/netclient/config/" + cfg.Node.Interface + ".conf") ncutils.Log("applyWGQuickConf to " + file)
err = wireguard.ApplyWGQuickConf(file)
if err != nil { if err != nil {
ncutils.Log("error restarting wg after node update " + err.Error()) ncutils.Log("error restarting wg after node update " + err.Error())
return return
@@ -199,7 +196,7 @@ func UpdateKeys(cfg *config.ClientConfig, client mqtt.Client) (*config.ClientCon
// Checkin -- go routine that checks for public or local ip changes, publishes changes // Checkin -- go routine that checks for public or local ip changes, publishes changes
// if there are no updates, simply "pings" the server as a checkin // if there are no updates, simply "pings" the server as a checkin
func Checkin(ctx context.Context, cfg config.ClientConfig, network string) { func Checkin(ctx context.Context, cfg *config.ClientConfig, network string) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@@ -217,7 +214,8 @@ func Checkin(ctx context.Context, cfg config.ClientConfig, network string) {
} }
if cfg.Node.Endpoint != extIP && extIP != "" { if cfg.Node.Endpoint != extIP && extIP != "" {
ncutils.PrintLog("endpoint has changed from "+cfg.Node.Endpoint+" to "+extIP, 1) ncutils.PrintLog("endpoint has changed from "+cfg.Node.Endpoint+" to "+extIP, 1)
UpdateEndpoint(cfg, network, extIP) cfg.Node.Endpoint = extIP
PublishNodeUpdate(cfg)
} }
intIP, err := getPrivateAddr() intIP, err := getPrivateAddr()
if err != nil { if err != nil {
@@ -225,7 +223,8 @@ func Checkin(ctx context.Context, cfg config.ClientConfig, network string) {
} }
if cfg.Node.LocalAddress != intIP && intIP != "" { if cfg.Node.LocalAddress != intIP && intIP != "" {
ncutils.PrintLog("local Address has changed from "+cfg.Node.LocalAddress+" to "+intIP, 1) ncutils.PrintLog("local Address has changed from "+cfg.Node.LocalAddress+" to "+intIP, 1)
UpdateLocalAddress(cfg, network, intIP) cfg.Node.LocalAddress = intIP
PublishNodeUpdate(cfg)
} }
} else { } else {
localIP, err := ncutils.GetLocalIP(cfg.Node.LocalRange) localIP, err := ncutils.GetLocalIP(cfg.Node.LocalRange)
@@ -234,7 +233,8 @@ func Checkin(ctx context.Context, cfg config.ClientConfig, network string) {
} }
if cfg.Node.Endpoint != localIP && localIP != "" { if cfg.Node.Endpoint != localIP && localIP != "" {
ncutils.PrintLog("endpoint has changed from "+cfg.Node.Endpoint+" to "+localIP, 1) ncutils.PrintLog("endpoint has changed from "+cfg.Node.Endpoint+" to "+localIP, 1)
UpdateEndpoint(cfg, network, localIP) cfg.Node.Endpoint = localIP
PublishNodeUpdate(cfg)
} }
} }
Hello(cfg, network) Hello(cfg, network)
@@ -243,37 +243,24 @@ func Checkin(ctx context.Context, cfg config.ClientConfig, network string) {
} }
} }
// UpdateEndpoint -- publishes an endpoint update to broker // PublishNodeUpdates -- saves node and pushes changes to broker
func UpdateEndpoint(cfg config.ClientConfig, network, ip string) { func PublishNodeUpdate(cfg *config.ClientConfig) {
ncutils.Log("Updating endpoint") if err := config.Write(cfg, cfg.Network); err != nil {
ncutils.Log("error saving configuration" + err.Error())
}
client := SetupMQTT(cfg) client := SetupMQTT(cfg)
if token := client.Publish("update/ip/"+cfg.Node.ID, 0, false, ip); token.Wait() && token.Error() != nil { data, err := json.Marshal(cfg.Node)
if err != nil {
ncutils.Log("error marshling node update " + err.Error())
}
if token := client.Publish("update/"+cfg.Node.ID, 0, false, data); token.Wait() && token.Error() != nil {
ncutils.Log("error publishing endpoint update " + token.Error().Error()) ncutils.Log("error publishing endpoint update " + token.Error().Error())
} }
cfg.Node.Endpoint = ip
if err := config.Write(&cfg, cfg.Network); err != nil {
ncutils.Log("error updating local config " + err.Error())
}
client.Disconnect(250)
}
// UpdateLocalAddress -- publishes a local address update to broker
func UpdateLocalAddress(cfg config.ClientConfig, network, ip string) {
ncutils.Log("Updating local address")
client := SetupMQTT(cfg)
if token := client.Publish("update/localaddress/"+cfg.Node.ID, 0, false, ip); token.Wait() && token.Error() != nil {
ncutils.Log("error publishing local address update " + token.Error().Error())
}
cfg.Node.LocalAddress = ip
ncutils.Log("updating local address in local config to: " + cfg.Node.LocalAddress)
if err := config.Write(&cfg, cfg.Network); err != nil {
ncutils.Log("error updating local config " + err.Error())
}
client.Disconnect(250) client.Disconnect(250)
} }
// Hello -- ping the broker to let server know node is alive and doing fine // Hello -- ping the broker to let server know node is alive and doing fine
func Hello(cfg config.ClientConfig, network string) { func Hello(cfg *config.ClientConfig, network string) {
client := SetupMQTT(cfg) client := SetupMQTT(cfg)
ncutils.Log("sending ping " + cfg.Node.ID) ncutils.Log("sending ping " + cfg.Node.ID)
if token := client.Publish("ping/"+cfg.Node.ID, 2, false, "hello world!"); token.Wait() && token.Error() != nil { if token := client.Publish("ping/"+cfg.Node.ID, 2, false, "hello world!"); token.Wait() && token.Error() != nil {

View File

@@ -85,6 +85,7 @@ func GetServerConfig() config.ServerConfig {
} else { } else {
cfg.RCE = "off" cfg.RCE = "off"
} }
cfg.Debug = GetDebug()
return cfg return cfg
} }
@@ -565,3 +566,8 @@ func getMacAddr() string {
func GetRce() bool { func GetRce() bool {
return os.Getenv("RCE") == "on" || config.Config.Server.RCE == "on" return os.Getenv("RCE") == "on" || config.Config.Server.RCE == "on"
} }
// GetDebug -- checks if debugging is enabled, off by default
func GetDebug() bool {
return os.Getenv("DEBUG") == "on" || config.Config.Server.Debug == true
}