use uuid in message queue functions

This commit is contained in:
Matthew R Kasun
2022-01-14 15:35:15 -05:00
parent aa6f7a138b
commit ec486addf0
3 changed files with 18 additions and 42 deletions

3
go.mod
View File

@@ -7,6 +7,7 @@ require (
github.com/go-playground/validator/v10 v10.10.0 github.com/go-playground/validator/v10 v10.10.0
github.com/golang-jwt/jwt/v4 v4.2.0 github.com/golang-jwt/jwt/v4 v4.2.0
github.com/golang/protobuf v1.5.2 // indirect github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.3.0
github.com/gorilla/handlers v1.5.1 github.com/gorilla/handlers v1.5.1
github.com/gorilla/mux v1.8.0 github.com/gorilla/mux v1.8.0
github.com/lib/pq v1.10.4 github.com/lib/pq v1.10.4
@@ -40,7 +41,7 @@ require (
github.com/go-playground/locales v0.14.0 // indirect github.com/go-playground/locales v0.14.0 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect github.com/go-playground/universal-translator v0.18.0 // indirect
github.com/google/go-cmp v0.5.5 // indirect github.com/google/go-cmp v0.5.5 // indirect
github.com/google/uuid v1.3.0 // indirect github.com/gorilla/websocket v1.4.2 // indirect
github.com/josharian/native v0.0.0-20200817173448-b6b71def0850 // indirect github.com/josharian/native v0.0.0-20200817173448-b6b71def0850 // indirect
github.com/leodido/go-urn v1.2.1 // indirect github.com/leodido/go-urn v1.2.1 // indirect
github.com/mdlayher/genetlink v1.0.0 // indirect github.com/mdlayher/genetlink v1.0.0 // indirect

View File

@@ -24,16 +24,15 @@ var Metrics mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
var Ping mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { var Ping mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
logger.Log(0, "Ping Handler: "+msg.Topic()) logger.Log(0, "Ping Handler: "+msg.Topic())
go func() { go func() {
mac, net, err := GetMacNetwork(msg.Topic()) id, err := GetID(msg.Topic())
if err != nil { if err != nil {
logger.Log(0, "error getting node.ID sent on ping topic ") logger.Log(0, "error getting node.ID sent on ping topic ")
return return
} }
logger.Log(0, "ping recieved from "+mac+" on net "+net) node, err := logic.GetNodeByID(id)
node, err := logic.GetNodeByMacAddress(net, mac)
if err != nil { if err != nil {
logger.Log(0, "mq-ping error getting node: "+err.Error()) logger.Log(0, "mq-ping error getting node: "+err.Error())
record, err := database.FetchRecord(database.NODES_TABLE_NAME, mac+"###"+net) record, err := database.FetchRecord(database.NODES_TABLE_NAME, id)
if err != nil { if err != nil {
logger.Log(0, "error reading database ", err.Error()) logger.Log(0, "error reading database ", err.Error())
return return
@@ -54,11 +53,11 @@ var PublicKeyUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Mess
go func() { go func() {
logger.Log(0, "public key update "+msg.Topic()) logger.Log(0, "public key update "+msg.Topic())
key := string(msg.Payload()) key := string(msg.Payload())
mac, network, err := GetMacNetwork(msg.Topic()) id, err := GetID(msg.Topic())
if err != nil { if err != nil {
logger.Log(0, "error getting node.ID sent on "+msg.Topic()+" "+err.Error()) logger.Log(0, "error getting node.ID sent on "+msg.Topic()+" "+err.Error())
} }
node, err := logic.GetNode(mac, network) node, err := logic.GetNodeByID(id)
if err != nil { if err != nil {
logger.Log(0, "error retrieving node "+msg.Topic()+" "+err.Error()) logger.Log(0, "error retrieving node "+msg.Topic()+" "+err.Error())
} }
@@ -74,13 +73,13 @@ var IPUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
go func() { go func() {
ip := string(msg.Payload()) ip := string(msg.Payload())
logger.Log(0, "IPUpdate Handler") logger.Log(0, "IPUpdate Handler")
mac, network, err := GetMacNetwork(msg.Topic()) id, err := GetID(msg.Topic())
logger.Log(0, "ipUpdate recieved from "+mac+" on net "+network) logger.Log(0, "ipUpdate recieved from "+id)
if err != nil { if err != nil {
logger.Log(0, "error getting node.ID sent on update/ip topic ") logger.Log(0, "error getting node.ID sent on update/ip topic ")
return return
} }
node, err := logic.GetNode(mac, network) node, err := logic.GetNodeByID(id)
if err != nil { if err != nil {
logger.Log(0, "invalid ID recieved on update/ip topic: "+err.Error()) logger.Log(0, "invalid ID recieved on update/ip topic: "+err.Error())
return return
@@ -110,7 +109,8 @@ func UpdatePeers(client mqtt.Client, node models.Node) error {
continue continue
} }
peerUpdate.Nodes = append(peerUpdate.Nodes, peer) peerUpdate.Nodes = append(peerUpdate.Nodes, peer)
peerUpdate.ExtPeers, err = logic.GetExtPeersList(node.MacAddress, node.Network) peerUpdate.ExtPeers, err = logic.GetExtPeersList(&node)
if err != nil { if err != nil {
logger.Log(0) logger.Log(0)
} }
@@ -150,12 +150,12 @@ var LocalAddressUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.M
logger.Log(0, "LocalAddressUpdate Handler") logger.Log(0, "LocalAddressUpdate Handler")
go func() { go func() {
logger.Log(0, "LocalAddressUpdate handler") logger.Log(0, "LocalAddressUpdate handler")
mac, net, err := GetMacNetwork(msg.Topic()) id, err := GetID(msg.Topic())
if err != nil { if err != nil {
logger.Log(0, "error getting node.ID "+msg.Topic()) logger.Log(0, "error getting node.ID "+msg.Topic())
return return
} }
node, err := logic.GetNode(mac, net) node, err := logic.GetNodeByID(id)
if err != nil { if err != nil {
logger.Log(0, "error get node "+msg.Topic()) logger.Log(0, "error get node "+msg.Topic())
return return
@@ -168,28 +168,12 @@ var LocalAddressUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.M
}() }()
} }
func GetMacNetwork(topic string) (string, string, error) {
parts := strings.Split(topic, "/")
count := len(parts)
if count == 1 {
return "", "", errors.New("invalid topic")
}
macnet := strings.Split(parts[count-1], "-")
if len(macnet) != 2 {
return "", "", errors.New("topic id not in mac---network format")
}
return macnet[0], macnet[1], nil
}
func GetID(topic string) (string, error) { func GetID(topic string) (string, error) {
parts := strings.Split(topic, "/") parts := strings.Split(topic, "/")
count := len(parts) count := len(parts)
if count == 1 { if count == 1 {
return "", errors.New("invalid topic") return "", errors.New("invalid topic")
} }
macnet := strings.Split(parts[count-1], "-") //the last part of the topic will be the node.ID
if len(macnet) != 2 { return parts[count], nil
return "", errors.New("topic id not in mac---network format")
}
return macnet[0] + "###" + macnet[1], nil
} }

View File

@@ -7,7 +7,6 @@ import (
"os" "os"
"os/signal" "os/signal"
"runtime" "runtime"
"strings"
"syscall" "syscall"
"time" "time"
@@ -58,9 +57,6 @@ func Netclient(ctx context.Context, network string) {
var cfg config.ClientConfig var cfg config.ClientConfig
cfg.Network = network cfg.Network = network
cfg.ReadConfig() cfg.ReadConfig()
//fix NodeID to remove ### so NodeID can be used as message topic
//remove with GRA-73
cfg.Node.ID = strings.Replace(cfg.Node.ID, "###", "-", 1)
ncutils.Log("daemon started for network:" + network) ncutils.Log("daemon started for network:" + network)
client := SetupMQTT(cfg) client := SetupMQTT(cfg)
if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil { if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil {
@@ -216,9 +212,6 @@ func Checkin(ctx context.Context, cfg config.ClientConfig, network string) {
ncutils.Log("Checkin running") ncutils.Log("Checkin running")
//read latest config //read latest config
cfg.ReadConfig() cfg.ReadConfig()
//fix NodeID to remove ### so NodeID can be used as message topic
//remove with GRA-73
cfg.Node.ID = strings.Replace(cfg.Node.ID, "###", "-", 1)
if cfg.Node.Roaming == "yes" && cfg.Node.IsStatic != "yes" { if cfg.Node.Roaming == "yes" && cfg.Node.IsStatic != "yes" {
extIP, err := ncutils.GetPublicIP() extIP, err := ncutils.GetPublicIP()
if err != nil { if err != nil {
@@ -301,15 +294,15 @@ func Metrics(ctx context.Context, cfg config.ClientConfig, network string) {
//delay should be configuraable -> use cfg.Node.NetworkSettings.DefaultCheckInInterval ?? //delay should be configuraable -> use cfg.Node.NetworkSettings.DefaultCheckInInterval ??
case <-time.After(time.Second * 60): case <-time.After(time.Second * 60):
ncutils.Log("Metrics collection running") ncutils.Log("Metrics collection running")
ncutils.Log("Metrics running")
wg, err := wgctrl.New() wg, err := wgctrl.New()
defer wg.Close()
if err != nil { if err != nil {
ncutils.Log("error getting devices " + err.Error()) ncutils.Log("error getting devices " + err.Error())
break break
} }
device, err := wg.Device(cfg.Node.Interface) device, err := wg.Device(cfg.Node.Interface)
if err != nil { if err != nil {
ncutils.Log("error readind wg device " + err.Error()) ncutils.Log("error reading wg device " + err.Error())
break break
} }
bytes, err := json.Marshal(device.Peers) bytes, err := json.Marshal(device.Peers)
@@ -321,8 +314,6 @@ func Metrics(ctx context.Context, cfg config.ClientConfig, network string) {
if token := client.Publish("metrics/"+cfg.Node.ID, 1, false, bytes); token.Wait() && token.Error() != nil { if token := client.Publish("metrics/"+cfg.Node.ID, 1, false, bytes); token.Wait() && token.Error() != nil {
ncutils.Log("error publishing metrics " + token.Error().Error()) ncutils.Log("error publishing metrics " + token.Error().Error())
} }
wg.Close()
client.Disconnect(250)
ncutils.Log("metrics collection complete") ncutils.Log("metrics collection complete")
} }
} }