mirror of
https://github.com/gravitl/netmaker.git
synced 2025-10-05 08:47:35 +08:00
publish peers on every keepalive
This commit is contained in:
9
mq/mq.go
9
mq/mq.go
@@ -18,7 +18,7 @@ import (
|
|||||||
"github.com/gravitl/netmaker/serverctl"
|
"github.com/gravitl/netmaker/serverctl"
|
||||||
)
|
)
|
||||||
|
|
||||||
const KEEPALIVE_TIMEOUT = 10 //timeout in seconds
|
const KEEPALIVE_TIMEOUT = 60 //timeout in seconds
|
||||||
const MQ_DISCONNECT = 250
|
const MQ_DISCONNECT = 250
|
||||||
|
|
||||||
// DefaultHandler default message queue handler - only called when GetDebug == true
|
// DefaultHandler default message queue handler - only called when GetDebug == true
|
||||||
@@ -125,7 +125,7 @@ func PublishPeerUpdate(newNode *models.Node) error {
|
|||||||
if err = publish(&node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data); err != nil {
|
if err = publish(&node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data); err != nil {
|
||||||
logger.Log(1, "failed to publish peer update for node", node.ID)
|
logger.Log(1, "failed to publish peer update for node", node.ID)
|
||||||
} else {
|
} else {
|
||||||
logger.Log(1, fmt.Sprintf("sent peer update for network, %s and node, %s", node.Network, node.Name))
|
logger.Log(1, fmt.Sprintf("sent peer update for node %s on network: %s ", node.Name, node.Network))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@@ -202,7 +202,10 @@ func Keepalive(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
serverNode, errN := logic.GetNodeByID(id)
|
serverNode, errN := logic.GetNodeByID(id)
|
||||||
if errN == nil && network.DefaultUDPHolePunch == "yes" && logic.ShouldPublishPeerPorts(&serverNode) {
|
if errN == nil {
|
||||||
|
if network.DefaultUDPHolePunch == "yes" {
|
||||||
|
logic.ShouldPublishPeerPorts(&serverNode)
|
||||||
|
}
|
||||||
err = PublishPeerUpdate(&serverNode)
|
err = PublishPeerUpdate(&serverNode)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Log(1, "error publishing udp port updates", err.Error())
|
logger.Log(1, "error publishing udp port updates", err.Error())
|
||||||
|
@@ -65,7 +65,7 @@ func SetupMQTT(cfg *config.ClientConfig) mqtt.Client {
|
|||||||
opts := mqtt.NewClientOptions()
|
opts := mqtt.NewClientOptions()
|
||||||
for _, server := range cfg.Node.NetworkSettings.DefaultServerAddrs {
|
for _, server := range cfg.Node.NetworkSettings.DefaultServerAddrs {
|
||||||
if server.Address != "" && server.IsLeader {
|
if server.Address != "" && server.IsLeader {
|
||||||
ncutils.Log(fmt.Sprintf("adding server (%s) to listen on network %s", server.Address, cfg.Node.Network))
|
// ncutils.Log(fmt.Sprintf("adding server (%s) to listen on network %s", server.Address, cfg.Node.Network))
|
||||||
opts.AddBroker(server.Address + ":1883")
|
opts.AddBroker(server.Address + ":1883")
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@@ -335,7 +335,7 @@ func MonitorKeepalive(ctx context.Context, client mqtt.Client, cfg *config.Clien
|
|||||||
return
|
return
|
||||||
case <-time.After(time.Second * 150):
|
case <-time.After(time.Second * 150):
|
||||||
if time.Since(keepalive[id]) > time.Second*200 { // more than 3+ minutes
|
if time.Since(keepalive[id]) > time.Second*200 { // more than 3+ minutes
|
||||||
ncutils.Log("server keepalive not recieved in more than minutes, resubscribe to message queue")
|
ncutils.Log("server keepalive not recieved recently, resubscribe to message queue")
|
||||||
err := Resubscribe(client, cfg)
|
err := Resubscribe(client, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ncutils.Log("closing " + err.Error())
|
ncutils.Log("closing " + err.Error())
|
||||||
|
Reference in New Issue
Block a user