diff --git a/controllers/node.go b/controllers/node.go index f95ba1b0..4ea427ad 100644 --- a/controllers/node.go +++ b/controllers/node.go @@ -7,7 +7,6 @@ import ( "net/http" "strings" - "github.com/google/uuid" "github.com/gorilla/mux" proxy_models "github.com/gravitl/netclient/nmproxy/models" "github.com/gravitl/netmaker/database" @@ -442,6 +441,13 @@ func getNode(w http.ResponseWriter, r *http.Request) { logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal")) return } + hostPeerUpdate, err := logic.GetPeerUpdateForHost(host) + if err != nil && !database.IsEmptyRecord(err) { + logger.Log(0, r.Header.Get("user"), + fmt.Sprintf("error fetching wg peers config for host [ %s ]: %v", host.ID.String(), err)) + logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal")) + return + } server := servercfg.GetServerInfo() network, err := logic.GetNetwork(node.Network) if err != nil { @@ -453,7 +459,9 @@ func getNode(w http.ResponseWriter, r *http.Request) { legacy := node.Legacy(host, &server, &network) response := models.NodeGet{ Node: *legacy, + Host: *host, Peers: peerUpdate.Peers, + HostPeers: hostPeerUpdate.Peers, ServerConfig: server, PeerIDs: peerUpdate.PeerIDs, } @@ -637,19 +645,18 @@ func createNode(w http.ResponseWriter, r *http.Request) { return } } - peerUpdate, err := logic.GetPeerUpdate(&data.Node, &data.Host) + hostPeerUpdate, err := logic.GetPeerUpdateForHost(&data.Host) if err != nil && !database.IsEmptyRecord(err) { logger.Log(0, r.Header.Get("user"), - fmt.Sprintf("error fetching wg peers config for node [ %s ]: %v", data.Node.ID.String(), err)) + fmt.Sprintf("error fetching wg peers config for host [ %s ]: %v", data.Host.ID.String(), err)) logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal")) return } - data.Node.Peers = peerUpdate.Peers - response := models.NodeJoinResponse{ Node: data.Node, ServerConfig: server, - PeerIDs: peerUpdate.PeerIDs, + Host: data.Host, + Peers: hostPeerUpdate.Peers, } logger.Log(1, r.Header.Get("user"), "created new node", data.Host.Name, "on network", networkName) w.WriteHeader(http.StatusOK) @@ -1061,28 +1068,14 @@ func deleteNode(w http.ResponseWriter, r *http.Request) { }, &node) } if fromNode { - // check if server should be removed from mq - // err is irrelevent - nodes, _ := logic.GetAllNodes() - var foundNode models.Node - for _, nodetocheck := range nodes { - if nodetocheck.HostID == node.HostID { - foundNode = nodetocheck - break - } - } - // TODO: Address how to remove host - if foundNode.HostID != uuid.Nil { - if err = logic.DissasociateNodeFromHost(&foundNode, host); err == nil { - currNets := logic.GetHostNetworks(host.ID.String()) - if len(currNets) > 0 { - mq.ModifyClient(&mq.MqClient{ - ID: host.ID.String(), - Text: host.Name, - Networks: currNets, - }) - } - } + // update networks for host mq client + currNets := logic.GetHostNetworks(host.ID.String()) + if len(currNets) > 0 { + mq.ModifyClient(&mq.MqClient{ + ID: host.ID.String(), + Text: host.Name, + Networks: currNets, + }) } } logic.ReturnSuccessResponse(w, r, nodeid+" deleted.") @@ -1091,12 +1084,11 @@ func deleteNode(w http.ResponseWriter, r *http.Request) { runUpdates(&node, false) return } - go func() { - if err := mq.PublishPeerUpdate(node.Network, false); err != nil { + go func(network string) { + if err := mq.PublishPeerUpdate(network, false); err != nil { logger.Log(1, "error publishing peer update ", err.Error()) - return } - }() + }(node.Network) } diff --git a/logic/nodes.go b/logic/nodes.go index 85b7a936..d3691824 100644 --- a/logic/nodes.go +++ b/logic/nodes.go @@ -48,9 +48,9 @@ func GetNetworkNodes(network string) ([]models.Node, error) { // UpdateNode - takes a node and updates another node with it's values func UpdateNode(currentNode *models.Node, newNode *models.Node) error { - if newNode.Address.String() != currentNode.Address.String() { + if newNode.Address.IP.String() != currentNode.Address.IP.String() { if network, err := GetParentNetwork(newNode.Network); err == nil { - if !IsAddressInCIDR(newNode.Address.String(), network.AddressRange) { + if !IsAddressInCIDR(newNode.Address.IP.String(), network.AddressRange) { return fmt.Errorf("invalid address provided; out of network range for node %s", newNode.ID) } } diff --git a/logic/peers.go b/logic/peers.go index c2536d6b..b66401a6 100644 --- a/logic/peers.go +++ b/logic/peers.go @@ -193,6 +193,127 @@ func GetPeersForProxy(node *models.Node, onlyPeers bool) (proxy_models.ProxyMana return proxyPayload, nil } +// GetPeerUpdateForHost - gets the consolidated peer update for the host from all networks +func GetPeerUpdateForHost(host *models.Host) (models.HostPeerUpdate, error) { + hostPeerUpdate := models.HostPeerUpdate{ + Network: make(map[string]models.NetworkInfo), + PeerIDs: make(models.HostPeerMap), + ServerVersion: servercfg.GetVersion(), + ServerAddrs: []models.ServerAddr{}, + } + log.Println("peer update for host ", host.ID.String()) + peerIndexMap := make(map[string]int) + for _, nodeID := range host.Nodes { + node, err := GetNodeByID(nodeID) + if err != nil { + continue + } + if !node.Connected { + continue + } + hostPeerUpdate.Network[node.Network] = models.NetworkInfo{ + DNS: getPeerDNS(node.Network), + } + currentPeers, err := GetNetworkNodes(node.Network) + if err != nil { + log.Println("no network nodes") + return models.HostPeerUpdate{}, err + } + for _, peer := range currentPeers { + if peer.ID == node.ID { + log.Println("peer update, skipping self") + //skip yourself + + continue + } + var peerConfig wgtypes.PeerConfig + peerHost, err := GetHost(peer.HostID.String()) + if err != nil { + log.Println("no peer host", err) + return models.HostPeerUpdate{}, err + } + + if !peer.Connected { + log.Println("peer update, skipping unconnected node") + //skip unconnected nodes + continue + } + if !nodeacls.AreNodesAllowed(nodeacls.NetworkID(node.Network), nodeacls.NodeID(node.ID.String()), nodeacls.NodeID(peer.ID.String())) { + log.Println("peer update, skipping node for acl") + //skip if not permitted by acl + continue + } + peerConfig.PublicKey = peerHost.PublicKey + peerConfig.PersistentKeepaliveInterval = &peer.PersistentKeepalive + peerConfig.ReplaceAllowedIPs = true + uselocal := false + if host.EndpointIP.String() == peerHost.EndpointIP.String() { + //peer is on same network + // set to localaddress + uselocal = true + if node.LocalAddress.IP == nil { + // use public endpint + uselocal = false + } + if node.LocalAddress.String() == peer.LocalAddress.String() { + uselocal = false + } + } + peerConfig.Endpoint = &net.UDPAddr{ + IP: peerHost.EndpointIP, + Port: peerHost.ListenPort, + } + if !host.ProxyEnabled && peerHost.ProxyEnabled { + peerConfig.Endpoint.Port = peerHost.ProxyListenPort + } + if uselocal { + peerConfig.Endpoint.IP = peer.LocalAddress.IP + } + allowedips := getNodeAllowedIPs(&peer, &node) + if peer.IsIngressGateway { + for _, entry := range peer.IngressGatewayRange { + _, cidr, err := net.ParseCIDR(string(entry)) + if err == nil { + allowedips = append(allowedips, *cidr) + } + } + } + if peer.IsRelay { + allowedips = append(allowedips, getRelayAllowedIPs(&node, &peer)...) + } + if peer.IsEgressGateway { + allowedips = append(allowedips, getEgressIPs(&node, &peer)...) + } + peerConfig.AllowedIPs = allowedips + + if _, ok := hostPeerUpdate.PeerIDs[peerHost.PublicKey.String()]; !ok { + hostPeerUpdate.PeerIDs[peerHost.PublicKey.String()] = make(map[string]models.IDandAddr) + hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, peerConfig) + peerIndexMap[peerHost.PublicKey.String()] = len(hostPeerUpdate.Peers) - 1 + hostPeerUpdate.PeerIDs[peerHost.PublicKey.String()][peer.ID.String()] = models.IDandAddr{ + ID: peer.ID.String(), + Address: peer.PrimaryAddress(), + Name: peerHost.Name, + Network: peer.Network, + } + } else { + peerAllowedIPs := hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]].AllowedIPs + peerAllowedIPs = append(peerAllowedIPs, allowedips...) + hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]].AllowedIPs = peerAllowedIPs + hostPeerUpdate.PeerIDs[peerHost.PublicKey.String()][peer.ID.String()] = models.IDandAddr{ + ID: peer.ID.String(), + Address: peer.PrimaryAddress(), + Name: peerHost.Name, + Network: peer.Network, + } + } + + } + } + + return hostPeerUpdate, nil +} + // GetPeerUpdate - gets a wireguard peer config for each peer of a node func GetPeerUpdate(node *models.Node, host *models.Host) (models.PeerUpdate, error) { log.Println("peer update for node ", node.ID) @@ -731,6 +852,7 @@ func getPeerDNS(network string) string { host, err := GetHost(node.HostID.String()) if err != nil { logger.Log(0, "error retrieving host for node", node.ID.String(), err.Error()) + continue } dns = dns + fmt.Sprintf("%s %s.%s\n", nodes[i].Address, host.Name, nodes[i].Network) } diff --git a/logic/zombie.go b/logic/zombie.go index e09e1116..92654985 100644 --- a/logic/zombie.go +++ b/logic/zombie.go @@ -34,7 +34,8 @@ func CheckZombies(newnode *models.Node, mac net.HardwareAddr) { for _, node := range nodes { host, err := GetHost(node.HostID.String()) if err != nil { - + // should we delete the node if host not found ?? + continue } if host.MacAddress.String() == mac.String() { logger.Log(0, "adding ", node.ID.String(), " to zombie list") diff --git a/models/api_node.go b/models/api_node.go index 88816ab8..a87c44e1 100644 --- a/models/api_node.go +++ b/models/api_node.go @@ -59,7 +59,6 @@ func (a *ApiNode) ConvertToServerNode(currentNode *Node) *Node { convertedNode.IsRelay = a.IsRelay convertedNode.IsRelayed = a.IsRelayed convertedNode.PendingDelete = a.PendingDelete - convertedNode.Peers = currentNode.Peers convertedNode.Failover = a.Failover convertedNode.IsEgressGateway = a.IsEgressGateway convertedNode.IsIngressGateway = a.IsIngressGateway diff --git a/models/metrics.go b/models/metrics.go index 11abdf4d..5227520e 100644 --- a/models/metrics.go +++ b/models/metrics.go @@ -37,11 +37,15 @@ type IDandAddr struct { Address string `json:"address" bson:"address" yaml:"address"` Name string `json:"name" bson:"name" yaml:"name"` IsServer string `json:"isserver" bson:"isserver" yaml:"isserver" validate:"checkyesorno"` + Network string `json:"network" bson:"network" yaml:"network" validate:"network"` } // PeerMap - peer map for ids and addresses in metrics type PeerMap map[string]IDandAddr +// HostPeerMap - host peer map for ids and addresses +type HostPeerMap map[string]map[string]IDandAddr + // MetricsMap - map for holding multiple metrics in memory type MetricsMap map[string]Metrics diff --git a/models/mqtt.go b/models/mqtt.go index eb05514e..3f723b54 100644 --- a/models/mqtt.go +++ b/models/mqtt.go @@ -16,6 +16,21 @@ type PeerUpdate struct { ProxyUpdate proxy_models.ProxyManagerPayload `json:"proxy_update" bson:"proxy_update" yaml:"proxy_update"` } +// HostPeerUpdate - struct for host peer updates +type HostPeerUpdate struct { + ServerVersion string `json:"serverversion" bson:"serverversion" yaml:"serverversion"` + ServerAddrs []ServerAddr `json:"serveraddrs" bson:"serveraddrs" yaml:"serveraddrs"` + Network map[string]NetworkInfo `json:"network" bson:"network" yaml:"network"` + Peers []wgtypes.PeerConfig `json:"peers" bson:"peers" yaml:"peers"` + PeerIDs HostPeerMap `json:"peerids" bson:"peerids" yaml:"peerids"` + ProxyUpdate proxy_models.ProxyManagerPayload `json:"proxy_update" bson:"proxy_update" yaml:"proxy_update"` +} + +// NetworkInfo - struct for network info +type NetworkInfo struct { + DNS string `json:"dns" bson:"dns" yaml:"dns"` +} + // KeyUpdate - key update struct type KeyUpdate struct { Network string `json:"network" bson:"network"` diff --git a/models/node.go b/models/node.go index 368e2da2..30920658 100644 --- a/models/node.go +++ b/models/node.go @@ -56,26 +56,25 @@ type Iface struct { // CommonNode - represents a commonn node data elements shared by netmaker and netclient type CommonNode struct { - ID uuid.UUID `json:"id" yaml:"id"` - HostID uuid.UUID `json:"hostid" yaml:"hostid"` - Network string `json:"network" yaml:"network"` - NetworkRange net.IPNet `json:"networkrange" yaml:"networkrange"` - NetworkRange6 net.IPNet `json:"networkrange6" yaml:"networkrange6"` - InternetGateway *net.UDPAddr `json:"internetgateway" yaml:"internetgateway"` - Server string `json:"server" yaml:"server"` - Connected bool `json:"connected" yaml:"connected"` - Address net.IPNet `json:"address" yaml:"address"` - Address6 net.IPNet `json:"address6" yaml:"address6"` - PostUp string `json:"postup" yaml:"postup"` - PostDown string `json:"postdown" yaml:"postdown"` - Action string `json:"action" yaml:"action"` - LocalAddress net.IPNet `json:"localaddress" yaml:"localaddress"` - IsLocal bool `json:"islocal" yaml:"islocal"` - IsEgressGateway bool `json:"isegressgateway" yaml:"isegressgateway"` - IsIngressGateway bool `json:"isingressgateway" yaml:"isingressgateway"` - DNSOn bool `json:"dnson" yaml:"dnson"` - PersistentKeepalive time.Duration `json:"persistentkeepalive" yaml:"persistentkeepalive"` - Peers []wgtypes.PeerConfig `json:"peers" yaml:"peers"` + ID uuid.UUID `json:"id" yaml:"id"` + HostID uuid.UUID `json:"hostid" yaml:"hostid"` + Network string `json:"network" yaml:"network"` + NetworkRange net.IPNet `json:"networkrange" yaml:"networkrange"` + NetworkRange6 net.IPNet `json:"networkrange6" yaml:"networkrange6"` + InternetGateway *net.UDPAddr `json:"internetgateway" yaml:"internetgateway"` + Server string `json:"server" yaml:"server"` + Connected bool `json:"connected" yaml:"connected"` + Address net.IPNet `json:"address" yaml:"address"` + Address6 net.IPNet `json:"address6" yaml:"address6"` + PostUp string `json:"postup" yaml:"postup"` + PostDown string `json:"postdown" yaml:"postdown"` + Action string `json:"action" yaml:"action"` + LocalAddress net.IPNet `json:"localaddress" yaml:"localaddress"` + IsLocal bool `json:"islocal" yaml:"islocal"` + IsEgressGateway bool `json:"isegressgateway" yaml:"isegressgateway"` + IsIngressGateway bool `json:"isingressgateway" yaml:"isingressgateway"` + DNSOn bool `json:"dnson" yaml:"dnson"` + PersistentKeepalive time.Duration `json:"persistentkeepalive" yaml:"persistentkeepalive"` } // Node - a model of a network node @@ -365,7 +364,7 @@ func (node *LegacyNode) SetDefaultFailover() { // Node.Fill - fills other node data into calling node data if not set on calling node func (newNode *Node) Fill(currentNode *Node) { // TODO add new field for nftables present newNode.ID = currentNode.ID - + newNode.HostID = currentNode.HostID // Revisit the logic for boolean values // TODO ---- !!!!!!!!!!!!!!!!!!!!!!!!!!!! // TODO ---- !!!!!!!!!!!!!!!!!!!!!!!!!! @@ -435,9 +434,6 @@ func (newNode *Node) Fill(currentNode *Node) { // TODO add new field for nftable if newNode.Server == "" { newNode.Server = currentNode.Server } - if newNode.Connected != currentNode.Connected { - newNode.Connected = currentNode.Connected - } if newNode.DefaultACL == "" { newNode.DefaultACL = currentNode.DefaultACL } @@ -499,17 +495,23 @@ func (ln *LegacyNode) ConvertToNewNode() (*Host, *Node) { host.HostPass = ln.Password host.Name = ln.Name host.ListenPort = int(ln.ListenPort) - _, cidr, _ := net.ParseCIDR(ln.LocalAddress) - _, cidr, _ = net.ParseCIDR(ln.LocalRange) - host.LocalRange = *cidr + if _, cidr, err := net.ParseCIDR(ln.LocalAddress); err == nil { + host.LocalRange = *cidr + } else { + if _, cidr, err := net.ParseCIDR(ln.LocalRange); err == nil { + host.LocalRange = *cidr + } + } host.LocalListenPort = int(ln.LocalListenPort) host.ProxyListenPort = int(ln.ProxyListenPort) host.MTU = int(ln.MTU) host.PublicKey, _ = wgtypes.ParseKey(ln.PublicKey) host.MacAddress, _ = net.ParseMAC(ln.MacAddress) host.TrafficKeyPublic = ln.TrafficKeys.Mine - gateway, _ := net.ResolveUDPAddr("udp", ln.InternetGateway) - host.InternetGateway = *gateway + gateway, err := net.ResolveUDPAddr("udp", ln.InternetGateway) + if err == nil { + host.InternetGateway = *gateway + } id, _ := uuid.Parse(ln.ID) host.Nodes = append(host.Nodes, id.String()) host.Interfaces = ln.Interfaces @@ -519,16 +521,26 @@ func (ln *LegacyNode) ConvertToNewNode() (*Host, *Node) { id, _ := uuid.Parse(ln.ID) node.ID = id node.Network = ln.Network - _, cidr, _ := net.ParseCIDR(ln.NetworkSettings.AddressRange) - node.NetworkRange = *cidr - _, cidr, _ = net.ParseCIDR(ln.NetworkSettings.AddressRange6) - node.NetworkRange6 = *cidr + if _, cidr, err := net.ParseCIDR(ln.NetworkSettings.AddressRange); err == nil { + node.NetworkRange = *cidr + } + if _, cidr, err := net.ParseCIDR(ln.NetworkSettings.AddressRange6); err == nil { + node.NetworkRange6 = *cidr + } node.Server = ln.Server node.Connected = parseBool(ln.Connected) - _, cidr, _ = net.ParseCIDR(ln.Address) - node.Address = *cidr - _, cidr, _ = net.ParseCIDR(ln.Address6) - node.Address6 = *cidr + if ln.Address != "" { + node.Address = net.IPNet{ + IP: net.ParseIP(ln.Address), + Mask: net.CIDRMask(32, 32), + } + } + if ln.Address6 != "" { + node.Address = net.IPNet{ + IP: net.ParseIP(ln.Address6), + Mask: net.CIDRMask(128, 128), + } + } node.PostUp = ln.PostUp node.PostDown = ln.PostDown node.Action = ln.Action diff --git a/models/structs.go b/models/structs.go index 970f9fd6..bb88b16b 100644 --- a/models/structs.go +++ b/models/structs.go @@ -205,16 +205,17 @@ type NodeGet struct { Node LegacyNode `json:"node" bson:"node" yaml:"node"` Host Host `json:"host" yaml:"host"` Peers []wgtypes.PeerConfig `json:"peers" bson:"peers" yaml:"peers"` + HostPeers []wgtypes.PeerConfig `json:"host_peers" bson:"host_peers" yaml:"host_peers"` ServerConfig ServerConfig `json:"serverconfig" bson:"serverconfig" yaml:"serverconfig"` PeerIDs PeerMap `json:"peerids,omitempty" bson:"peerids,omitempty" yaml:"peerids,omitempty"` } // NodeJoinResponse data returned to node in response to join type NodeJoinResponse struct { - Node Node `json:"node" bson:"node" yaml:"node"` - Host Host `json:"host" yaml:"host"` - ServerConfig ServerConfig `json:"serverconfig" bson:"serverconfig" yaml:"serverconfig"` - PeerIDs PeerMap `json:"peerids,omitempty" bson:"peerids,omitempty" yaml:"peerids,omitempty"` + Node Node `json:"node" bson:"node" yaml:"node"` + Host Host `json:"host" yaml:"host"` + ServerConfig ServerConfig `json:"serverconfig" bson:"serverconfig" yaml:"serverconfig"` + Peers []wgtypes.PeerConfig `json:"peers" bson:"peers" yaml:"peers"` } // ServerConfig - struct for dealing with the server information for a netclient diff --git a/mq/dynsec_clients.go b/mq/dynsec_clients.go index d9653d4d..3f122a39 100644 --- a/mq/dynsec_clients.go +++ b/mq/dynsec_clients.go @@ -13,7 +13,7 @@ func ModifyClient(client *MqClient) error { roles := []MqDynSecRole{ { - Rolename: HostRole, + Rolename: HostGenericRole, Priority: -1, }, } @@ -43,6 +43,7 @@ func ModifyClient(client *MqClient) error { // DeleteMqClient - removes a client from the DynSec system func DeleteMqClient(hostID string) error { + deleteHostRole(hostID) event := MqDynsecPayload{ Commands: []MqDynSecCmd{ { @@ -57,9 +58,17 @@ func DeleteMqClient(hostID string) error { // CreateMqClient - creates an MQ DynSec client func CreateMqClient(client *MqClient) error { + err := createHostRole(client.ID) + if err != nil { + return err + } roles := []MqDynSecRole{ { - Rolename: HostRole, + Rolename: HostGenericRole, + Priority: -1, + }, + { + Rolename: getHostRoleName(client.ID), Priority: -1, }, } diff --git a/mq/dynsec_helper.go b/mq/dynsec_helper.go index 60c6e790..746492d9 100644 --- a/mq/dynsec_helper.go +++ b/mq/dynsec_helper.go @@ -19,8 +19,8 @@ const ( exporterRole = "exporter" // constant for node role NodeRole = "node" - // HostRole constant for host role - HostRole = "host" + // HostGenericRole constant for host role + HostGenericRole = "host" // const for dynamic security file dynamicSecurityFile = "dynamic-security.json" @@ -66,7 +66,7 @@ var ( Acls: fetchServerAcls(), }, { - Rolename: HostRole, + Rolename: HostGenericRole, Acls: fetchNodeAcls(), }, exporterMQRole, @@ -169,6 +169,18 @@ func ListClients(client mqtt.Client) (ListClientsData, error) { return resp, errors.New("resp not found") } +// fetches host related acls +func fetchHostAcls(hostID string) []Acl { + return []Acl{ + { + AclType: "publishClientReceive", + Topic: fmt.Sprintf("peers/host/%s/#", hostID), + Priority: -1, + Allow: true, + }, + } +} + // FetchNetworkAcls - fetches network acls func FetchNetworkAcls(network string) []Acl { return []Acl{ @@ -220,6 +232,20 @@ func DeleteNetworkRole(network string) error { return publishEventToDynSecTopic(event) } +func deleteHostRole(hostID string) error { + // Deletes the hostID role from MQ + event := MqDynsecPayload{ + Commands: []MqDynSecCmd{ + { + Command: DeleteRoleCmd, + RoleName: getHostRoleName(hostID), + }, + }, + } + + return publishEventToDynSecTopic(event) +} + // CreateNetworkRole - createss a network role from DynSec system func CreateNetworkRole(network string) error { // Create Role with acls for the network @@ -237,6 +263,27 @@ func CreateNetworkRole(network string) error { return publishEventToDynSecTopic(event) } +// creates role for the host with ID. +func createHostRole(hostID string) error { + // Create Role with acls for the host + event := MqDynsecPayload{ + Commands: []MqDynSecCmd{ + { + Command: CreateRoleCmd, + RoleName: getHostRoleName(hostID), + Textname: "host role with Acls for hosts", + Acls: fetchHostAcls(hostID), + }, + }, + } + + return publishEventToDynSecTopic(event) +} + +func getHostRoleName(hostID string) string { + return fmt.Sprintf("host-%s", hostID) +} + // serverAcls - fetches server role related acls func fetchServerAcls() []Acl { return []Acl{ @@ -252,6 +299,12 @@ func fetchServerAcls() []Acl { Priority: -1, Allow: true, }, + { + AclType: "publishClientSend", + Topic: "peers/host/#", + Priority: -1, + Allow: true, + }, { AclType: "publishClientSend", Topic: "update/#", diff --git a/mq/handlers.go b/mq/handlers.go index 4ea670b1..a1d97b5c 100644 --- a/mq/handlers.go +++ b/mq/handlers.go @@ -90,19 +90,20 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) { logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error()) return } - var newNode models.Node - if err := json.Unmarshal(decrypted, &newNode); err != nil { + var oldNode models.LegacyNode + if err := json.Unmarshal(decrypted, &oldNode); err != nil { logger.Log(1, "error unmarshaling payload ", err.Error()) return } - ifaceDelta := logic.IfaceDelta(¤tNode, &newNode) + _, newNode := oldNode.ConvertToNewNode() + ifaceDelta := logic.IfaceDelta(¤tNode, newNode) if servercfg.Is_EE && ifaceDelta { if err = logic.EnterpriseResetAllPeersFailovers(currentNode.ID.String(), currentNode.Network); err != nil { logger.Log(1, "failed to reset failover list during node update", currentNode.ID.String(), currentNode.Network) } } newNode.SetLastCheckIn() - if err := logic.UpdateNode(¤tNode, &newNode); err != nil { + if err := logic.UpdateNode(¤tNode, newNode); err != nil { logger.Log(1, "error saving node", err.Error()) return } @@ -165,9 +166,13 @@ func UpdateMetrics(client mqtt.Client, msg mqtt.Message) { if shouldUpdate { logger.Log(2, "updating peers after node", currentNode.ID.String(), currentNode.Network, "detected connectivity issues") - if err = PublishSinglePeerUpdate(¤tNode); err != nil { - logger.Log(0, "failed to publish update after failover peer change for node", currentNode.ID.String(), currentNode.Network) + host, err := logic.GetHost(currentNode.HostID.String()) + if err == nil { + if err = PublishSingleHostUpdate(host); err != nil { + logger.Log(0, "failed to publish update after failover peer change for node", currentNode.ID.String(), currentNode.Network) + } } + } logger.Log(1, "updated node metrics", id) diff --git a/mq/publishers.go b/mq/publishers.go index 164adc7d..78cc86a4 100644 --- a/mq/publishers.go +++ b/mq/publishers.go @@ -14,20 +14,21 @@ import ( "github.com/gravitl/netmaker/serverctl" ) -// PublishPeerUpdate --- deterines and publishes a peer update to all the peers of a node +// PublishPeerUpdate --- determines and publishes a peer update to all the hosts func PublishPeerUpdate(network string, publishToSelf bool) error { if !servercfg.IsMessageQueueBackend() { return nil } - networkNodes, err := logic.GetNetworkNodes(network) + + hosts, err := logic.GetAllHosts() if err != nil { - logger.Log(1, "err getting Network Nodes", err.Error()) + logger.Log(1, "err getting all hosts", err.Error()) return err } - for _, node := range networkNodes { - err = PublishSinglePeerUpdate(&node) + for _, host := range hosts { + err = PublishSingleHostUpdate(&host) if err != nil { - logger.Log(1, "failed to publish peer update to node", node.ID.String(), "on network", node.Network, ":", err.Error()) + logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error()) } } return err @@ -47,24 +48,20 @@ func PublishProxyPeerUpdate(node *models.Node) error { return nil } -// PublishSinglePeerUpdate --- determines and publishes a peer update to one node -func PublishSinglePeerUpdate(node *models.Node) error { - host, err := logic.GetHost(node.HostID.String()) - if err != nil { - return nil - } +// PublishSingleHostUpdate --- determines and publishes a peer update to one host +func PublishSingleHostUpdate(host *models.Host) error { - peerUpdate, err := logic.GetPeerUpdate(node, host) + peerUpdate, err := logic.GetPeerUpdateForHost(host) if err != nil { return err } if host.ProxyEnabled { - proxyUpdate, err := logic.GetPeersForProxy(node, false) - if err != nil { - return err - } - proxyUpdate.Action = proxy_models.AddNetwork - peerUpdate.ProxyUpdate = proxyUpdate + // proxyUpdate, err := logic.GetPeersForProxy(node, false) + // if err != nil { + // return err + // } + // proxyUpdate.Action = proxy_models.AddNetwork + // peerUpdate.ProxyUpdate = proxyUpdate } @@ -72,36 +69,12 @@ func PublishSinglePeerUpdate(node *models.Node) error { if err != nil { return err } - return publish(node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data) + return publish(host, fmt.Sprintf("peers/host/%s/%s", host.ID.String(), servercfg.GetServer()), data) } // PublishPeerUpdate --- publishes a peer update to all the peers of a node func PublishExtPeerUpdate(node *models.Node) error { - host, err := logic.GetHost(node.HostID.String()) - if err != nil { - return nil - } - if !servercfg.IsMessageQueueBackend() { - return nil - } - peerUpdate, err := logic.GetPeerUpdate(node, host) - if err != nil { - return err - } - data, err := json.Marshal(&peerUpdate) - if err != nil { - return err - } - if host.ProxyEnabled { - proxyUpdate, err := logic.GetPeersForProxy(node, false) - if err == nil { - peerUpdate.ProxyUpdate = proxyUpdate - } - } - if err = publish(node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data); err != nil { - return err - } go PublishPeerUpdate(node.Network, false) return nil } @@ -126,7 +99,7 @@ func NodeUpdate(node *models.Node) error { logger.Log(2, "error marshalling node update ", err.Error()) return err } - if err = publish(node, fmt.Sprintf("update/%s/%s", node.Network, node.ID), data); err != nil { + if err = publish(host, fmt.Sprintf("update/%s/%s", node.Network, node.ID), data); err != nil { logger.Log(2, "error publishing node update to peer ", node.ID.String(), err.Error()) return err } @@ -156,7 +129,7 @@ func ProxyUpdate(proxyPayload *proxy_models.ProxyManagerPayload, node *models.No logger.Log(2, "error marshalling node update ", err.Error()) return err } - if err = publish(node, fmt.Sprintf("proxy/%s/%s", node.Network, node.ID), data); err != nil { + if err = publish(host, fmt.Sprintf("proxy/%s/%s", node.Network, node.ID), data); err != nil { logger.Log(2, "error publishing proxy update to peer ", node.ID.String(), err.Error()) return err } @@ -166,7 +139,7 @@ func ProxyUpdate(proxyPayload *proxy_models.ProxyManagerPayload, node *models.No // sendPeers - retrieve networks, send peer ports to all peers func sendPeers() { - networks, err := logic.GetNetworks() + hosts, err := logic.GetAllHosts() if err != nil { logger.Log(1, "error retrieving networks for keepalive", err.Error()) } @@ -191,13 +164,12 @@ func sendPeers() { //collectServerMetrics(networks[:]) } - for _, network := range networks { + for _, host := range hosts { if force { logger.Log(2, "sending scheduled peer update (5 min)") - err = PublishPeerUpdate(network.NetID, false) + err = PublishSingleHostUpdate(&host) if err != nil { - logger.Log(1, "error publishing udp port updates for network", network.NetID) - logger.Log(1, err.Error()) + logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error()) } } } diff --git a/mq/util.go b/mq/util.go index eafa35db..2ca4942f 100644 --- a/mq/util.go +++ b/mq/util.go @@ -40,7 +40,7 @@ func decryptMsg(node *models.Node, msg []byte) ([]byte, error) { return ncutils.DeChunk(msg, nodePubTKey, serverPrivTKey) } -func encryptMsg(node *models.Node, msg []byte) ([]byte, error) { +func encryptMsg(host *models.Host, msg []byte) ([]byte, error) { // fetch server public key to be certain hasn't changed in transit trafficKey, trafficErr := logic.RetrievePrivateTrafficKey() if trafficErr != nil { @@ -52,10 +52,6 @@ func encryptMsg(node *models.Node, msg []byte) ([]byte, error) { return nil, err } - host, err := logic.GetHost(node.HostID.String()) - if err != nil { - return nil, err - } nodePubKey, err := ncutils.ConvertBytesToKey(host.TrafficKeyPublic) if err != nil { return nil, err @@ -68,8 +64,8 @@ func encryptMsg(node *models.Node, msg []byte) ([]byte, error) { return ncutils.Chunk(msg, nodePubKey, serverPrivKey) } -func publish(node *models.Node, dest string, msg []byte) error { - encrypted, encryptErr := encryptMsg(node, msg) +func publish(host *models.Host, dest string, msg []byte) error { + encrypted, encryptErr := encryptMsg(host, msg) if encryptErr != nil { return encryptErr }