added relay functionality to proxy

This commit is contained in:
Abhishek Kondur
2022-11-04 01:24:48 +05:30
parent 4e55242cb1
commit acae6c3aed
17 changed files with 430 additions and 144 deletions

View File

@@ -217,6 +217,14 @@ func setSubscriptions(client mqtt.Client, nodeCfg *config.ClientConfig) {
}
return
}
if token := client.Subscribe(fmt.Sprintf("update/proxy/%s/%s", nodeCfg.Node.Network, nodeCfg.Node.ID), 0, mqtt.MessageHandler(ProxyUpdate)); token.WaitTimeout(mq.MQ_TIMEOUT*time.Second) && token.Error() != nil {
if token.Error() == nil {
logger.Log(0, "network:", nodeCfg.Node.Network, "connection timeout")
} else {
logger.Log(0, "network:", nodeCfg.Node.Network, token.Error().Error())
}
return
}
logger.Log(3, fmt.Sprintf("subscribed to node updates for node %s update/%s/%s", nodeCfg.Node.Name, nodeCfg.Node.Network, nodeCfg.Node.ID))
if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", nodeCfg.Node.Network, nodeCfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil {
logger.Log(0, "network", nodeCfg.Node.Network, token.Error().Error())

View File

@@ -31,6 +31,26 @@ var All mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
//logger.Log(0, "Message: " + string(msg.Payload()))
}
func ProxyUpdate(client mqtt.Client, msg mqtt.Message) {
var nodeCfg config.ClientConfig
var proxyUpdate manager.ManagerAction
var network = strings.Split(msg.Topic(), "/")[2]
nodeCfg.Network = network
nodeCfg.ReadConfig()
data, dataErr := decryptMsg(&nodeCfg, msg.Payload())
if dataErr != nil {
return
}
err := json.Unmarshal([]byte(data), &proxyUpdate)
if err != nil {
logger.Log(0, "error unmarshalling node update data"+err.Error())
return
}
logger.Log(0, "---------> recieved a proxy update")
ProxyMgmChan <- &proxyUpdate
}
// NodeUpdate -- mqtt message handler for /update/<NodeID> topic
func NodeUpdate(client mqtt.Client, msg mqtt.Message) {
var newNode models.Node
@@ -145,6 +165,12 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) {
// }
// }
// }
ProxyMgmChan <- &manager.ManagerAction{
Action: manager.AddInterface,
Payload: manager.ManagerPayload{
IsRelayed: newNode.IsRelay == "yes",
},
}
if ifaceDelta { // if a change caused an ifacedelta we need to notify the server to update the peers
doneErr := publishSignal(&nodeCfg, ncutils.DONE)
if doneErr != nil {
@@ -252,6 +278,8 @@ func UpdatePeers(client mqtt.Client, msg mqtt.Message) {
Payload: manager.ManagerPayload{
InterfaceName: cfg.Node.Interface,
Peers: peerUpdate.Peers,
IsRelayed: peerUpdate.IsRelayed,
RelayedTo: peerUpdate.RelayTo,
},
}
logger.Log(0, "network:", cfg.Node.Network, "received peer update for node "+cfg.Node.Name+" "+cfg.Node.Network)