diff --git a/controllers/node_grpc.go b/controllers/node_grpc.go index d6245c04..4bbc4109 100644 --- a/controllers/node_grpc.go +++ b/controllers/node_grpc.go @@ -10,6 +10,7 @@ import ( "github.com/gravitl/netmaker/logger" "github.com/gravitl/netmaker/logic" "github.com/gravitl/netmaker/models" + "github.com/gravitl/netmaker/mq" "github.com/gravitl/netmaker/servercfg" ) @@ -85,6 +86,10 @@ func (s *NodeServiceServer) CreateNode(ctx context.Context, req *nodepb.Object) if err != nil { return nil, err } + // notify other nodes on network of new peer + if err := mq.NewPeer(node); err != nil { + logger.Log(0, "failed to inform peers of new node "+err.Error()) + } err = runServerPeerUpdate(node.Network, true) if err != nil { diff --git a/main.go b/main.go index 3d527a0b..b48ed5f7 100644 --- a/main.go +++ b/main.go @@ -208,10 +208,6 @@ func runMessageQueue(wg *sync.WaitGroup) { client.Disconnect(240) logger.Log(0, "ping sub failed") } - if token := client.Subscribe("metrics/#", 0, mq.Metrics); token.Wait() && token.Error() != nil { - client.Disconnect(240) - logger.Log(0, "metrics sub failed") - } if token := client.Subscribe("update/localaddress/#", 0, mq.LocalAddressUpdate); token.Wait() && token.Error() != nil { client.Disconnect(240) logger.Log(0, "metrics sub failed") diff --git a/mq/mq.go b/mq/mq.go index cb32b430..6cd80dcc 100644 --- a/mq/mq.go +++ b/mq/mq.go @@ -3,24 +3,24 @@ package mq import ( "encoding/json" "errors" + "net" + "strconv" "strings" + "time" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/gravitl/netmaker/database" "github.com/gravitl/netmaker/logger" "github.com/gravitl/netmaker/logic" "github.com/gravitl/netmaker/models" + "github.com/gravitl/netmaker/servercfg" + "golang.zx2c4.com/wireguard/wgctrl/wgtypes" ) var DefaultHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { logger.Log(0, "MQTT Message: Topic: "+string(msg.Topic())+" Message: "+string(msg.Payload())) } -var Metrics mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { - logger.Log(0, "Metrics Handler") - //TODOD -- handle metrics data ---- store to database? -} - var Ping mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { logger.Log(0, "Ping Handler: "+msg.Topic()) go func() { @@ -42,6 +42,9 @@ var Ping mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { return } node.SetLastCheckIn() + if err := logic.UpdateNode(&node, &node) ; err != nil { + logger.Log(0, "error updating node "+ err.Error()) + } logger.Log(0, "ping processed") // --TODO --set client version once feature is implemented. //node.SetClientVersion(msg.Payload()) @@ -63,6 +66,9 @@ var PublicKeyUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Mess } node.PublicKey = key node.SetLastCheckIn() + if err := logic.UpdateNode(&node, &node) ; err != nil { + logger.Log(0, "error updating node "+ err.Error()) + } if err := UpdatePeers(client, node); err != nil { logger.Log(0, "error updating peers "+err.Error()) } @@ -86,36 +92,64 @@ var IPUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { } node.Endpoint = ip node.SetLastCheckIn() + if err := logic.UpdateNode(&node, &node) ; err != nil { + logger.Log(0, "error updating node "+ err.Error()) + } if err != UpdatePeers(client, node) { logger.Log(0, "error updating peers "+err.Error()) } }() } -func UpdatePeers(client mqtt.Client, node models.Node) error { - peersToUpdate, err := logic.GetNetworkNodes(node.Network) +func UpdatePeers(client mqtt.Client, newnode models.Node) error { + networkNodes, err := logic.GetNetworkNodes(newnode.Network) if err != nil { - logger.Log(0, "error retrieving peers to be updated "+err.Error()) return err } - for _, peerToUpdate := range peersToUpdate { - peers, _, _, err := logic.GetServerPeers(&peerToUpdate) - if err != nil { - logger.Log(0, "error retrieving peers "+err.Error()) - return err - } - if peerToUpdate.ID == node.ID { - continue - } + keepalive, _ := time.ParseDuration(string(newnode.PersistentKeepalive)+"s") + for _, node := range networkNodes { + var peers []wgtypes.PeerConfig var peerUpdate models.PeerUpdate + for _, peer := range networkNodes{ + if peer.ID == node.ID { + //skip + continue + } + pubkey, err := wgtypes.ParseKey(peer.PublicKey) + if err != nil { + return err + } + if node.Endpoint == peer.Endpoint { + if node.LocalAddress != peer.LocalAddress && peer.LocalAddress != "" { + peer.Endpoint = peer.LocalAddress + }else { + continue + } + } + endpoint := peer.Endpoint + ":" + strconv.Itoa(int(peer.ListenPort)) + //fmt.Println("endpoint: ", endpoint, peer.Endpoint, peer.ListenPort) + address, err := net.ResolveUDPAddr("udp", endpoint) + if err != nil { + return err + } + //calculate Allowed IPs. + var peerData wgtypes.PeerConfig + peerData = wgtypes.PeerConfig{ + PublicKey: pubkey, + Endpoint: address, + PersistentKeepaliveInterval: &keepalive, + //AllowedIPs: allowedIPs + } + peers = append (peers, peerData) + } peerUpdate.Network = node.Network - peerUpdate.Peers = peers - data, err := json.Marshal(peerUpdate) + peerUpdate.Peers = peers + data, err := json.Marshal(&peerUpdate) if err != nil { logger.Log(0, "error marshaling peer update "+err.Error()) return err } - if token := client.Publish("/update/peers/"+peerToUpdate.ID, 0, false, data); token.Wait() && token.Error() != nil { + if token := client.Publish("/update/peers/"+node.ID, 0, false, data); token.Wait() && token.Error() != nil { logger.Log(0, "error sending peer updatte to no") return err } @@ -154,3 +188,19 @@ func GetID(topic string) (string, error) { //the last part of the topic will be the node.ID return parts[count-1], nil } + +func NewPeer(node models.Node) error { + opts := mqtt.NewClientOptions() + broker := servercfg.GetMessageQueueEndpoint() + logger.Log(0, "broker: "+broker) + opts.AddBroker(broker) + client := mqtt.NewClient(opts) + if token := client.Connect(); token.Wait() && token.Error() != nil { + return token.Error() + } + + if err := UpdatePeers(client, node); err != nil { + return err + } + return nil +}